Repository: ignite Updated Branches: refs/heads/master 7a5aa7c6b -> 4b351b6bf
IGNITE-8666: Add ability of filtering data during datasets creation this closes #4101 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b351b6b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b351b6b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b351b6b Branch: refs/heads/master Commit: 4b351b6bfb80cff534e37367cdc229de63d16e26 Parents: 7a5aa7c Author: Anton Dmitriev <[email protected]> Authored: Fri Jun 1 17:43:43 2018 +0300 Committer: Yury Babak <[email protected]> Committed: Fri Jun 1 17:43:43 2018 +0300 ---------------------------------------------------------------------- .../clustering/KMeansClusterizationExample.java | 8 +- .../examples/ml/knn/KNNRegressionExample.java | 4 +- .../ml/preprocessing/BinarizationExample.java | 1 - .../ml/preprocessing/ImputingExample.java | 1 - .../dataset/impl/cache/CacheBasedDataset.java | 10 ++- .../impl/cache/CacheBasedDatasetBuilder.java | 22 ++++- .../dataset/impl/cache/util/ComputeUtils.java | 76 +++++++++++++--- ...eratorWithConcurrentModificationChecker.java | 80 +++++++++++++++++ .../impl/cache/util/UpstreamCursorAdapter.java | 68 --------------- .../dataset/impl/local/LocalDatasetBuilder.java | 38 ++++++-- .../ml/knn/regression/KNNRegressionTrainer.java | 3 +- .../cache/CacheBasedDatasetBuilderTest.java | 39 +++++++++ .../impl/cache/util/ComputeUtilsTest.java | 2 + ...orWithConcurrentModificationCheckerTest.java | 91 ++++++++++++++++++++ .../impl/local/LocalDatasetBuilderTest.java | 38 ++++++++ .../imputing/ImputerTrainerTest.java | 3 - 16 files changed, 383 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java index 8825ebb..cb140d4 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/ml/clustering/KMeansClusterizationExample.java @@ -27,12 +27,11 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder; +import org.apache.ignite.ml.clustering.kmeans.KMeansModel; +import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer; import org.apache.ignite.ml.knn.classification.KNNClassificationTrainer; import org.apache.ignite.ml.math.Tracer; import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector; -import org.apache.ignite.ml.clustering.kmeans.KMeansModel; -import org.apache.ignite.ml.clustering.kmeans.KMeansTrainer; import org.apache.ignite.thread.IgniteThread; /** @@ -57,7 +56,8 @@ public class KMeansClusterizationExample { .withSeed(7867L); KMeansModel mdl = trainer.fit( - new CacheBasedDatasetBuilder<>(ignite, dataCache), + ignite, + dataCache, (k, v) -> Arrays.copyOfRange(v, 1, v.length), (k, v) -> v[0] ); http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java index 76a07cd..757c8e6 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/ml/knn/KNNRegressionExample.java @@ -27,7 +27,6 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.ml.dataset.impl.cache.CacheBasedDatasetBuilder; import org.apache.ignite.ml.knn.classification.KNNClassificationTrainer; import org.apache.ignite.ml.knn.classification.KNNStrategy; import org.apache.ignite.ml.knn.regression.KNNRegressionModel; @@ -57,7 +56,8 @@ public class KNNRegressionExample { KNNRegressionTrainer trainer = new KNNRegressionTrainer(); KNNRegressionModel knnMdl = (KNNRegressionModel) trainer.fit( - new CacheBasedDatasetBuilder<>(ignite, dataCache), + ignite, + dataCache, (k, v) -> Arrays.copyOfRange(v, 1, v.length), (k, v) -> v[0] ).withK(5) http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java index aa2aa98..edf4fd7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/BinarizationExample.java @@ -28,7 +28,6 @@ import org.apache.ignite.ml.dataset.DatasetFactory; import org.apache.ignite.ml.dataset.primitive.SimpleDataset; import org.apache.ignite.ml.math.functions.IgniteBiFunction; import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer; -import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer; /** * Example that shows how to use binarization preprocessor to binarize data. http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java index 9565c85..e0c0d86 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/ml/preprocessing/ImputingExample.java @@ -27,7 +27,6 @@ import org.apache.ignite.examples.ml.dataset.model.Person; import org.apache.ignite.ml.dataset.DatasetFactory; import org.apache.ignite.ml.dataset.primitive.SimpleDataset; import org.apache.ignite.ml.math.functions.IgniteBiFunction; -import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer; import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java index 7428faf..1b492a7 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDataset.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.ml.dataset.Dataset; import org.apache.ignite.ml.dataset.PartitionDataBuilder; import org.apache.ignite.ml.dataset.impl.cache.util.ComputeUtils; @@ -55,6 +56,9 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoClose /** Ignite Cache with {@code upstream} data. */ private final IgniteCache<K, V> upstreamCache; + /** Filter for {@code upstream} data. */ + private final IgniteBiPredicate<K, V> filter; + /** Ignite Cache with partition {@code context}. */ private final IgniteCache<Integer, C> datasetCache; @@ -70,15 +74,17 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoClose * * @param ignite Ignite instance. * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param filter Filter for {@code upstream} data. * @param datasetCache Ignite Cache with partition {@code context}. * @param partDataBuilder Partition {@code data} builder. * @param datasetId Dataset ID. */ - public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, + public CacheBasedDataset(Ignite ignite, IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K, V> filter, IgniteCache<Integer, C> datasetCache, PartitionDataBuilder<K, V, C, D> partDataBuilder, UUID datasetId) { this.ignite = ignite; this.upstreamCache = upstreamCache; + this.filter = filter; this.datasetCache = datasetCache; this.partDataBuilder = partDataBuilder; this.datasetId = datasetId; @@ -95,6 +101,7 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoClose D data = ComputeUtils.getData( Ignition.localIgnite(), upstreamCacheName, + filter, datasetCacheName, datasetId, part, @@ -123,6 +130,7 @@ public class CacheBasedDataset<K, V, C extends Serializable, D extends AutoClose D data = ComputeUtils.getData( Ignition.localIgnite(), upstreamCacheName, + filter, datasetCacheName, datasetId, part, http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java index 5c0d583..b66c8aa 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilder.java @@ -23,6 +23,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.ml.dataset.DatasetBuilder; import org.apache.ignite.ml.dataset.PartitionContextBuilder; import org.apache.ignite.ml.dataset.PartitionDataBuilder; @@ -52,15 +53,31 @@ public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K, V> { /** Ignite Cache with {@code upstream} data. */ private final IgniteCache<K, V> upstreamCache; + /** Filter for {@code upstream} data. */ + private final IgniteBiPredicate<K, V> filter; + /** - * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}. + * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset} with default + * predicate that passes all upstream entries to dataset. * * @param ignite Ignite instance. * @param upstreamCache Ignite Cache with {@code upstream} data. */ public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache) { + this(ignite, upstreamCache, (a, b) -> true); + } + + /** + * Constructs a new instance of cache based dataset builder that makes {@link CacheBasedDataset}. + * + * @param ignite Ignite instance. + * @param upstreamCache Ignite Cache with {@code upstream} data. + * @param filter Filter for {@code upstream} data. + */ + public CacheBasedDatasetBuilder(Ignite ignite, IgniteCache<K, V> upstreamCache, IgniteBiPredicate<K, V> filter) { this.ignite = ignite; this.upstreamCache = upstreamCache; + this.filter = filter; } /** {@inheritDoc} */ @@ -84,12 +101,13 @@ public class CacheBasedDatasetBuilder<K, V> implements DatasetBuilder<K, V> { ComputeUtils.initContext( ignite, upstreamCache.getName(), + filter, datasetCache.getName(), partCtxBuilder, RETRIES, RETRY_INTERVAL ); - return new CacheBasedDataset<>(ignite, upstreamCache, datasetCache, partDataBuilder, datasetId); + return new CacheBasedDataset<>(ignite, upstreamCache, filter, datasetCache, partDataBuilder, datasetId); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java index ce2fcfd..b235900 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtils.java @@ -23,10 +23,10 @@ import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.locks.LockSupport; -import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; @@ -35,9 +35,11 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.ml.dataset.PartitionContextBuilder; import org.apache.ignite.ml.dataset.PartitionDataBuilder; +import org.apache.ignite.ml.dataset.UpstreamEntry; import org.apache.ignite.ml.math.functions.IgniteFunction; /** @@ -133,6 +135,7 @@ public class ComputeUtils { * * @param ignite Ignite instance. * @param upstreamCacheName Name of an {@code upstream} cache. + * @param filter Filter for {@code upstream} data. * @param datasetCacheName Name of a partition {@code context} cache. * @param datasetId Dataset ID. * @param part Partition index. @@ -144,7 +147,7 @@ public class ComputeUtils { * @return Partition {@code data}. */ public static <K, V, C extends Serializable, D extends AutoCloseable> D getData(Ignite ignite, - String upstreamCacheName, String datasetCacheName, UUID datasetId, int part, + String upstreamCacheName, IgniteBiPredicate<K, V> filter, String datasetCacheName, UUID datasetId, int part, PartitionDataBuilder<K, V, C, D> partDataBuilder) { PartitionDataStorage dataStorage = (PartitionDataStorage)ignite @@ -161,12 +164,18 @@ public class ComputeUtils { ScanQuery<K, V> qry = new ScanQuery<>(); qry.setLocal(true); qry.setPartition(part); + qry.setFilter(filter); - long cnt = upstreamCache.localSizeLong(part); + long cnt = computeCount(upstreamCache, qry); if (cnt > 0) { - try (QueryCursor<Cache.Entry<K, V>> cursor = upstreamCache.query(qry)) { - return partDataBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt, ctx); + try (QueryCursor<UpstreamEntry<K, V>> cursor = upstreamCache.query(qry, + e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) { + + Iterator<UpstreamEntry<K, V>> iter = new IteratorWithConcurrentModificationChecker<>(cursor.iterator(), cnt, + "Cache expected to be not modified during dataset data building"); + + return partDataBuilder.build(iter, cnt, ctx); } } @@ -179,6 +188,7 @@ public class ComputeUtils { * * @param ignite Ignite instance. * @param upstreamCacheName Name of an {@code upstream} cache. + * @param filter Filter for {@code upstream} data. * @param datasetCacheName Name of a partition {@code context} cache. * @param ctxBuilder Partition {@code context} builder. * @param <K> Type of a key in {@code upstream} data. @@ -186,7 +196,8 @@ public class ComputeUtils { * @param <C> Type of a partition {@code context}. */ public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName, - String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries, int interval) { + IgniteBiPredicate<K, V> filter, String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries, + int interval) { affinityCallWithRetries(ignite, Arrays.asList(datasetCacheName, upstreamCacheName), part -> { Ignite locIgnite = Ignition.localIgnite(); @@ -195,11 +206,18 @@ public class ComputeUtils { ScanQuery<K, V> qry = new ScanQuery<>(); qry.setLocal(true); qry.setPartition(part); + qry.setFilter(filter); + + long cnt = computeCount(locUpstreamCache, qry); - long cnt = locUpstreamCache.localSizeLong(part); C ctx; - try (QueryCursor<Cache.Entry<K, V>> cursor = locUpstreamCache.query(qry)) { - ctx = ctxBuilder.build(new UpstreamCursorAdapter<>(cursor.iterator(), cnt), cnt); + try (QueryCursor<UpstreamEntry<K, V>> cursor = locUpstreamCache.query(qry, + e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) { + + Iterator<UpstreamEntry<K, V>> iter = new IteratorWithConcurrentModificationChecker<>(cursor.iterator(), cnt, + "Cache expected to be not modified during dataset context building"); + + ctx = ctxBuilder.build(iter, cnt); } IgniteCache<Integer, C> datasetCache = locIgnite.cache(datasetCacheName); @@ -215,6 +233,7 @@ public class ComputeUtils { * * @param ignite Ignite instance. * @param upstreamCacheName Name of an {@code upstream} cache. + * @param filter Filter for {@code upstream} data. * @param datasetCacheName Name of a partition {@code context} cache. * @param ctxBuilder Partition {@code context} builder. * @param retries Number of retries for the case when one of partitions not found on the node. @@ -223,8 +242,9 @@ public class ComputeUtils { * @param <C> Type of a partition {@code context}. */ public static <K, V, C extends Serializable> void initContext(Ignite ignite, String upstreamCacheName, - String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, int retries) { - initContext(ignite, upstreamCacheName, datasetCacheName, ctxBuilder, retries, 0); + IgniteBiPredicate<K, V> filter, String datasetCacheName, PartitionContextBuilder<K, V, C> ctxBuilder, + int retries) { + initContext(ignite, upstreamCacheName, filter, datasetCacheName, ctxBuilder, retries, 0); } /** @@ -253,4 +273,38 @@ public class ComputeUtils { IgniteCache<Integer, C> datasetCache = ignite.cache(datasetCacheName); datasetCache.put(part, ctx); } + + /** + * Computes number of entries selected from the cache by the query. + * + * @param cache Ignite cache with upstream data. + * @param qry Cache query. + * @param <K> Type of a key in {@code upstream} data. + * @param <V> Type of a value in {@code upstream} data. + * @return Number of entries supplied by the iterator. + */ + private static <K, V> long computeCount(IgniteCache<K, V> cache, ScanQuery<K, V> qry) { + try (QueryCursor<UpstreamEntry<K, V>> cursor = cache.query(qry, + e -> new UpstreamEntry<>(e.getKey(), e.getValue()))) { + return computeCount(cursor.iterator()); + } + } + + /** + * Computes number of entries supplied by the iterator. + * + * @param iter Iterator. + * @return Number of entries supplied by the iterator. + */ + private static long computeCount(Iterator<?> iter) { + long res = 0; + + while (iter.hasNext()) { + iter.next(); + + res++; + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java new file mode 100644 index 0000000..f757622 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationChecker.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator wrapper that checks if number of entries in iterator is equal to expected. + * + * @param <T> Type of entries. + */ +public class IteratorWithConcurrentModificationChecker<T> implements Iterator<T> { + /** Delegate. */ + private final Iterator<T> delegate; + + /** Expected count of entries. */ + private long expCnt; + + /** Exception message. */ + private final String eMsg; + + /** + * Constructs a new instance of iterator checked wrapper. + * + * @param delegate Delegate. + * @param expCnt Expected count of entries. + */ + public IteratorWithConcurrentModificationChecker(Iterator<T> delegate, long expCnt, String eMsg) { + this.delegate = delegate; + this.expCnt = expCnt; + this.eMsg = eMsg; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + boolean hasNext = delegate.hasNext(); + + if (!hasNext ^ expCnt == 0) + throw new ConcurrentModificationException(eMsg); + + return hasNext; + } + + /** {@inheritDoc} */ + @Override public T next() { + try { + T next = delegate.next(); + + if (expCnt == 0) + throw new ConcurrentModificationException(eMsg); + + expCnt--; + + return next; + } + catch (NoSuchElementException e) { + if (expCnt == 0) + throw e; + else + throw new ConcurrentModificationException(eMsg); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java deleted file mode 100644 index 4482af7..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/cache/util/UpstreamCursorAdapter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.dataset.impl.cache.util; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import javax.cache.Cache; -import org.apache.ignite.ml.dataset.UpstreamEntry; - -/** - * Cursor adapter used to transform {@code Cache.Entry} received from Ignite Cache query cursor into DLC-specific - * {@link UpstreamEntry}. - * - * @param <K> Type of an upstream value key. - * @param <V> Type of an upstream value. - */ -public class UpstreamCursorAdapter<K, V> implements Iterator<UpstreamEntry<K, V>> { - /** Cache entry iterator. */ - private final Iterator<Cache.Entry<K, V>> delegate; - - /** Size. */ - private long cnt; - - /** - * Constructs a new instance of iterator. - * - * @param delegate Cache entry iterator. - */ - UpstreamCursorAdapter(Iterator<Cache.Entry<K, V>> delegate, long cnt) { - this.delegate = delegate; - this.cnt = cnt; - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return delegate.hasNext() && cnt > 0; - } - - /** {@inheritDoc} */ - @Override public UpstreamEntry<K, V> next() { - if (cnt == 0) - throw new NoSuchElementException(); - - cnt--; - - Cache.Entry<K, V> next = delegate.next(); - - if (next == null) - return null; - - return new UpstreamEntry<>(next.getKey(), next.getValue()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java index cfc1801..a4f275d 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilder.java @@ -19,9 +19,11 @@ package org.apache.ignite.ml.dataset.impl.local; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.ml.dataset.DatasetBuilder; import org.apache.ignite.ml.dataset.PartitionContextBuilder; import org.apache.ignite.ml.dataset.PartitionDataBuilder; @@ -42,14 +44,30 @@ public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K, V> { /** Number of partitions. */ private final int partitions; + /** Filter for {@code upstream} data. */ + private final IgniteBiPredicate<K, V> filter; + /** - * Constructs a new instance of local dataset builder that makes {@link LocalDataset}. + * Constructs a new instance of local dataset builder that makes {@link LocalDataset} with default predicate that + * passes all upstream entries to dataset. * * @param upstreamMap {@code Map} with upstream data. * @param partitions Number of partitions. */ public LocalDatasetBuilder(Map<K, V> upstreamMap, int partitions) { + this(upstreamMap, (a, b) -> true, partitions); + } + + /** + * Constructs a new instance of local dataset builder that makes {@link LocalDataset}. + * + * @param upstreamMap {@code Map} with upstream data. + * @param filter Filter for {@code upstream} data. + * @param partitions Number of partitions. + */ + public LocalDatasetBuilder(Map<K, V> upstreamMap, IgniteBiPredicate<K, V> filter, int partitions) { this.upstreamMap = upstreamMap; + this.filter = filter; this.partitions = partitions; } @@ -60,22 +78,28 @@ public class LocalDatasetBuilder<K, V> implements DatasetBuilder<K, V> { List<C> ctxList = new ArrayList<>(); List<D> dataList = new ArrayList<>(); - int partSize = Math.max(1, upstreamMap.size() / partitions); + Map<K, V> filteredMap = new HashMap<>(); + upstreamMap.forEach((key, val) -> { + if (filter.apply(key, val)) + filteredMap.put(key, val); + }); + + int partSize = Math.max(1, filteredMap.size() / partitions); - Iterator<K> firstKeysIter = upstreamMap.keySet().iterator(); - Iterator<K> secondKeysIter = upstreamMap.keySet().iterator(); + Iterator<K> firstKeysIter = filteredMap.keySet().iterator(); + Iterator<K> secondKeysIter = filteredMap.keySet().iterator(); int ptr = 0; for (int part = 0; part < partitions; part++) { - int cnt = part == partitions - 1 ? upstreamMap.size() - ptr : Math.min(partSize, upstreamMap.size() - ptr); + int cnt = part == partitions - 1 ? filteredMap.size() - ptr : Math.min(partSize, filteredMap.size() - ptr); C ctx = cnt > 0 ? partCtxBuilder.build( - new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt), + new IteratorWindow<>(firstKeysIter, k -> new UpstreamEntry<>(k, filteredMap.get(k)), cnt), cnt ) : null; D data = cnt > 0 ? partDataBuilder.build( - new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k, upstreamMap.get(k)), cnt), + new IteratorWindow<>(secondKeysIter, k -> new UpstreamEntry<>(k, filteredMap.get(k)), cnt), cnt, ctx ) : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java index 2d13cd5..7944149 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/knn/regression/KNNRegressionTrainer.java @@ -20,11 +20,12 @@ package org.apache.ignite.ml.knn.regression; import org.apache.ignite.ml.dataset.DatasetBuilder; import org.apache.ignite.ml.knn.KNNUtils; import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.trainers.SingleLabelDatasetTrainer; /** * kNN algorithm trainer to solve regression task. */ -public class KNNRegressionTrainer{ +public class KNNRegressionTrainer implements SingleLabelDatasetTrainer<KNNRegressionModel> { /** * Trains model based on the specified data. * http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java index c35cdc3..1cf6dbf 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/CacheBasedDatasetBuilderTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.ml.dataset.UpstreamEntry; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -86,6 +87,44 @@ public class CacheBasedDatasetBuilderTest extends GridCommonAbstractTest { } /** + * Tests that predicate works correctly. + */ + public void testBuildWithPredicate() { + CacheConfiguration<Integer, Integer> upstreamCacheConfiguration = new CacheConfiguration<>(); + upstreamCacheConfiguration.setAffinity(new RendezvousAffinityFunction(false, 1)); + upstreamCacheConfiguration.setName(UUID.randomUUID().toString()); + + IgniteCache<Integer, Integer> upstreamCache = ignite.createCache(upstreamCacheConfiguration); + upstreamCache.put(1, 1); + upstreamCache.put(2, 2); + + CacheBasedDatasetBuilder<Integer, Integer> builder = new CacheBasedDatasetBuilder<>( + ignite, + upstreamCache, + (k, v) -> k % 2 == 0 + ); + + CacheBasedDataset<Integer, Integer, Long, AutoCloseable> dataset = builder.build( + (upstream, upstreamSize) -> { + UpstreamEntry<Integer, Integer> entry = upstream.next(); + assertEquals(Integer.valueOf(2), entry.getKey()); + assertEquals(Integer.valueOf(2), entry.getValue()); + assertFalse(upstream.hasNext()); + return 0L; + }, + (upstream, upstreamSize, ctx) -> { + UpstreamEntry<Integer, Integer> entry = upstream.next(); + assertEquals(Integer.valueOf(2), entry.getKey()); + assertEquals(Integer.valueOf(2), entry.getValue()); + assertFalse(upstream.hasNext()); + return null; + } + ); + + dataset.compute(data -> {}); + } + + /** * Generate an Ignite Cache with the specified size and number of partitions for testing purposes. * * @param size Size of an Ignite Cache. http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java index 4926a90..952fc43 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/ComputeUtilsTest.java @@ -177,6 +177,7 @@ public class ComputeUtilsTest extends GridCommonAbstractTest { part -> ComputeUtils.<Integer, Integer, Serializable, TestPartitionData>getData( ignite, upstreamCacheName, + (k, v) -> true, datasetCacheName, datasetId, 0, @@ -225,6 +226,7 @@ public class ComputeUtilsTest extends GridCommonAbstractTest { ComputeUtils.<Integer, Integer, Integer>initContext( ignite, upstreamCacheName, + (k, v) -> true, datasetCacheName, (upstream, upstreamSize) -> { http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java new file mode 100644 index 0000000..232281e --- /dev/null +++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/cache/util/IteratorWithConcurrentModificationCheckerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.dataset.impl.cache.util; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.List; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link IteratorWithConcurrentModificationChecker}. + */ +public class IteratorWithConcurrentModificationCheckerTest { + /** */ + @Test(expected = ConcurrentModificationException.class) + public void testNextWhenIteratorHasLessElementsThanExpected() { + List<Integer> list = Arrays.asList(1, 2, 3); + + Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(), 4, "Exception"); + + assertEquals(Integer.valueOf(1), iter.next()); + assertEquals(Integer.valueOf(2), iter.next()); + assertEquals(Integer.valueOf(3), iter.next()); + + iter.next(); // Should throw an exception. + } + + /** */ + @Test(expected = ConcurrentModificationException.class) + public void testNextWhenIteratorHasMoreElementsThanExpected() { + List<Integer> list = Arrays.asList(1, 2, 3); + + Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(), 2, "Exception"); + + assertEquals(Integer.valueOf(1), iter.next()); + assertEquals(Integer.valueOf(2), iter.next()); + + iter.next(); // Should throw an exception. + } + + /** */ + @Test(expected = ConcurrentModificationException.class) + public void testHasNextWhenIteratorHasLessElementsThanExpected() { + List<Integer> list = Arrays.asList(1, 2, 3); + + Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(), 4, "Exception"); + + assertTrue(iter.hasNext()); + iter.next(); + assertTrue(iter.hasNext()); + iter.next(); + assertTrue(iter.hasNext()); + iter.next(); + + iter.hasNext(); // Should throw an exception. + } + + /** */ + @Test(expected = ConcurrentModificationException.class) + public void testHasNextWhenIteratorHasMoreElementsThanExpected() { + List<Integer> list = Arrays.asList(1, 2, 3); + + Iterator<Integer> iter = new IteratorWithConcurrentModificationChecker<>(list.iterator(), 2, "Exception"); + + assertTrue(iter.hasNext()); + iter.next(); + assertTrue(iter.hasNext()); + iter.next(); + + iter.hasNext(); // Should throw an exception. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java index 0628580..8a5eb3a 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/dataset/impl/local/LocalDatasetBuilderTest.java @@ -67,6 +67,44 @@ public class LocalDatasetBuilderTest { assertEquals(10, cnt.intValue()); } + /** Tests {@code build()} method with predicate. */ + @Test + public void testBuildWithPredicate() { + Map<Integer, Integer> data = new HashMap<>(); + for (int i = 0; i < 100; i++) + data.put(i, i); + + LocalDatasetBuilder<Integer, Integer> builder = new LocalDatasetBuilder<>(data, (k, v) -> k % 2 == 0,10); + + LocalDataset<Serializable, TestPartitionData> dataset = builder.build( + (upstream, upstreamSize) -> null, + (upstream, upstreamSize, ctx) -> { + int[] arr = new int[Math.toIntExact(upstreamSize)]; + + int ptr = 0; + while (upstream.hasNext()) + arr[ptr++] = upstream.next().getValue(); + + return new TestPartitionData(arr); + } + ); + + AtomicLong cnt = new AtomicLong(); + + dataset.compute((partData, partIdx) -> { + cnt.incrementAndGet(); + + int[] arr = partData.data; + + assertEquals(5, arr.length); + + for (int i = 0; i < 5; i++) + assertEquals((partIdx * 5 + i) * 2, arr[i]); + }); + + assertEquals(10, cnt.intValue()); + } + /** * Test partition {@code data}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4b351b6b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java index 86d10fb..06e52fa 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/preprocessing/imputing/ImputerTrainerTest.java @@ -22,12 +22,9 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.ml.dataset.DatasetBuilder; import org.apache.ignite.ml.dataset.impl.local.LocalDatasetBuilder; -import org.apache.ignite.ml.preprocessing.binarization.BinarizationPreprocessor; -import org.apache.ignite.ml.preprocessing.binarization.BinarizationTrainer; import org.apache.ignite.ml.preprocessing.imputer.ImputerPreprocessor; import org.apache.ignite.ml.preprocessing.imputer.ImputerTrainer; import org.apache.ignite.ml.preprocessing.imputer.ImputingStrategy; -import org.apache.ignite.ml.preprocessing.normalization.NormalizationTrainer; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized;
