IGNITE-5109 Refactoring for SparseDistributedMatrix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/156ec536 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/156ec536 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/156ec536 Branch: refs/heads/ignite-5009 Commit: 156ec5360e6ec918878d9d0c6f7a5d04fc8161a0 Parents: c64ad78 Author: Yury Babak <yba...@gridgain.com> Authored: Wed May 3 20:02:38 2017 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Wed May 3 20:02:38 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/ml/math/impls/CacheUtils.java | 111 +++++++++++++------ .../ml/math/impls/matrix/AbstractMatrix.java | 5 + .../impls/matrix/DenseLocalOffHeapMatrix.java | 5 - .../impls/matrix/SparseDistributedMatrix.java | 16 ++- .../matrix/SparseDistributedMatrixStorage.java | 89 ++++++++++----- .../ignite/ml/math/MathImplMainTestSuite.java | 5 +- .../matrix/SparseDistributedMatrixTest.java | 67 +++++++++-- 7 files changed, 213 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java index cfb01be..ace399b 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/CacheUtils.java @@ -28,13 +28,17 @@ import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.KeyMapper; import org.apache.ignite.ml.math.ValueMapper; import org.apache.ignite.ml.math.functions.IgniteBiFunction; import org.apache.ignite.ml.math.functions.IgniteConsumer; import org.apache.ignite.ml.math.functions.IgniteFunction; +import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage; /** * Distribution-related misc. support. @@ -118,17 +122,21 @@ public class CacheUtils { } /** - * @param cacheName Cache name. + * @param matrixUuid Matrix UUID. * @return Sum obtained using sparse logic. */ - public static <K, V> double sparseSum(String cacheName) { - Collection<Double> subSums = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> { - Map<Integer, Double> map = ce.entry().getValue(); + public static <K, V> double sparseSum(IgniteUuid matrixUuid) { + Collection<Double> subSums = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> { + Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry(); + if (entry.getKey().get2().equals(matrixUuid)) { + Map<Integer, Double> map = entry.getValue(); - double sum = sum(map.values()); + double sum = sum(map.values()); - return acc == null ? sum : acc + sum; - }); + return acc == null ? sum : acc + sum; + } else + return acc; + }, key -> key.get2().equals(matrixUuid)); return sum(subSums); } @@ -172,39 +180,48 @@ public class CacheUtils { } /** - * @param cacheName Cache name. + * @param matrixUuid Matrix UUID. * @return Minimum value obtained using sparse logic. */ - public static <K, V> double sparseMin(String cacheName) { - Collection<Double> mins = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> { - Map<Integer, Double> map = ce.entry().getValue(); + public static <K, V> double sparseMin(IgniteUuid matrixUuid) { + Collection<Double> mins = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> { + Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry(); - double min = Collections.min(map.values()); + if (entry.getKey().get2().equals(matrixUuid)) { + Map<Integer, Double> map = entry.getValue(); - if (acc == null) - return min; - else - return Math.min(acc, min); - }); + double min = Collections.min(map.values()); + + if (acc == null) + return min; + else + return Math.min(acc, min); + } else + return acc; + }, key -> key.get2().equals(matrixUuid)); return Collections.min(mins); } /** - * @param cacheName Cache name. + * @param matrixUuid Matrix UUID. * @return Maximum value obtained using sparse logic. */ - public static <K, V> double sparseMax(String cacheName) { - Collection<Double> maxes = fold(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce, Double acc) -> { - Map<Integer, Double> map = ce.entry().getValue(); + public static <K, V> double sparseMax(IgniteUuid matrixUuid) { + Collection<Double> maxes = fold(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce, Double acc) -> { + Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> entry = ce.entry(); + if (entry.getKey().get2().equals(matrixUuid)) { + Map<Integer, Double> map = entry.getValue(); - double max = Collections.max(map.values()); + double max = Collections.max(map.values()); - if (acc == null) - return max; - else - return Math.max(acc, max); - }); + if (acc == null) + return max; + else + return Math.max(acc, max); + } else + return acc; + }, key -> key.get2().equals(matrixUuid)); return Collections.max(maxes); } @@ -254,19 +271,20 @@ public class CacheUtils { } /** - * @param cacheName Cache name. + * @param matrixUuid Matrix UUID. * @param mapper Mapping {@link IgniteFunction}. */ - public static <K, V> void sparseMap(String cacheName, IgniteFunction<Double, Double> mapper) { - foreach(cacheName, (CacheEntry<Integer, Map<Integer, Double>> ce) -> { - Integer k = ce.entry().getKey(); + public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteFunction<Double, Double> mapper) { + foreach(SparseDistributedMatrixStorage.ML_CACHE_NAME, (CacheEntry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> ce) -> { + IgniteBiTuple k = ce.entry().getKey(); + Map<Integer, Double> v = ce.entry().getValue(); for (Map.Entry<Integer, Double> e : v.entrySet()) e.setValue(mapper.apply(e.getValue())); ce.cache().put(k, v); - }); + }, key -> key.get2().equals(matrixUuid)); } /** @@ -276,6 +294,17 @@ public class CacheUtils { * @param <V> Cache value object type. */ public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) { + foreach(cacheName, fun, null); + } + + /** + * @param cacheName Cache name. + * @param fun An operation that accepts a cache entry and processes it. + * @param keyFilter Cache keys filter. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + */ + public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun, IgnitePredicate<K> keyFilter) { bcast(cacheName, () -> { Ignite ignite = Ignition.localIgnite(); IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); @@ -293,7 +322,7 @@ public class CacheUtils { // Iterate over given partition. // Query returns an empty cursor if this partition is not stored on this node. for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, - (k, v) -> affinity.mapPartitionToNode(p) == locNode))) + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) fun.accept(new CacheEntry<>(entry, cache)); } }); @@ -310,6 +339,20 @@ public class CacheUtils { * @return Fold operation result. */ public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder) { + return fold(cacheName, folder, null); + } + + /** + * <b>Currently fold supports only commutative operations.<b/> + * + * @param cacheName Cache name. + * @param folder Fold function operating over cache entries. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @param <A> Fold result type. + * @return Fold operation result. + */ + public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder, IgnitePredicate<K> keyFilter) { return bcast(cacheName, () -> { Ignite ignite = Ignition.localIgnite(); IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); @@ -329,7 +372,7 @@ public class CacheUtils { // Iterate over given partition. // Query returns an empty cursor if this partition is not stored on this node. for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, - (k, v) -> affinity.mapPartitionToNode(p) == locNode))) + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) a = folder.apply(new CacheEntry<>(entry, cache), a); } http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java index c5edeb1..d1d3904 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/AbstractMatrix.java @@ -840,6 +840,11 @@ public abstract class AbstractMatrix implements Matrix { } /** {@inheritDoc} */ + @Override public void destroy() { + getStorage().destroy(); + } + + /** {@inheritDoc} */ @Override public Matrix copy() { Matrix cp = like(rowSize(), columnSize()); http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java index 4161228..fad35fd 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/DenseLocalOffHeapMatrix.java @@ -71,11 +71,6 @@ public class DenseLocalOffHeapMatrix extends AbstractMatrix { } /** {@inheritDoc} */ - @Override public void destroy() { - getStorage().destroy(); - } - - /** {@inheritDoc} */ @Override protected Matrix likeIdentity() { int n = rowSize(); Matrix res = like(n, n); http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java index 10ebdd0..3e508bd 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrix.java @@ -26,6 +26,7 @@ package org.apache.ignite.ml.math.impls.matrix; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.Matrix; import org.apache.ignite.ml.math.StorageConstants; import org.apache.ignite.ml.math.Vector; @@ -119,24 +120,24 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo * @return Matrix with mapped values. */ private Matrix mapOverValues(IgniteFunction<Double, Double> mapper) { - CacheUtils.sparseMap(storage().cache().getName(), mapper); + CacheUtils.sparseMap(getUUID(), mapper); return this; } /** {@inheritDoc} */ @Override public double sum() { - return CacheUtils.sparseSum(storage().cache().getName()); + return CacheUtils.sparseSum(getUUID()); } /** {@inheritDoc} */ @Override public double maxValue() { - return CacheUtils.sparseMax(storage().cache().getName()); + return CacheUtils.sparseMax(getUUID()); } /** {@inheritDoc} */ @Override public double minValue() { - return CacheUtils.sparseMin(storage().cache().getName()); + return CacheUtils.sparseMin(getUUID()); } /** {@inheritDoc} */ @@ -146,11 +147,16 @@ public class SparseDistributedMatrix extends AbstractMatrix implements StorageCo /** {@inheritDoc} */ @Override public Matrix like(int rows, int cols) { - throw new UnsupportedOperationException(); + return new SparseDistributedMatrix(rows, cols, storage().storageMode(), storage().accessMode()); } /** {@inheritDoc} */ @Override public Vector likeVector(int crd) { throw new UnsupportedOperationException(); } + + /** */ + private IgniteUuid getUUID(){ + return ((SparseDistributedMatrixStorage) getStorage()).getUUID(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java index bfc0e9f..816bf44 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/SparseDistributedMatrixStorage.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; @@ -30,6 +33,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.MatrixStorage; import org.apache.ignite.ml.math.StorageConstants; @@ -40,19 +44,22 @@ import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; * {@link MatrixStorage} implementation for {@link SparseDistributedMatrix}. */ public class SparseDistributedMatrixStorage extends CacheUtils implements MatrixStorage, StorageConstants { + /** Cache name used for all instances of {@link SparseDistributedMatrixStorage}.*/ + public static final String ML_CACHE_NAME = "ML_SPARSE_MATRICES_CONTAINER"; /** Amount of rows in the matrix. */ private int rows; /** Amount of columns in the matrix. */ private int cols; - /** Row or column based storage mode. */ private int stoMode; /** Random or sequential access mode. */ private int acsMode; + /** Matrix uuid. */ + private IgniteUuid uuid; /** Actual distributed storage. */ private IgniteCache< - Integer /* Row or column index. */, + IgniteBiTuple<Integer, IgniteUuid> /* Row or column index with matrix uuid. */, Map<Integer, Double> /* Map-based row or column. */ > cache = null; @@ -81,14 +88,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix this.acsMode = acsMode; cache = newCache(); + + uuid = IgniteUuid.randomUuid(); } /** - * - * + * Create new ML cache if needed. */ - private IgniteCache<Integer, Map<Integer, Double>> newCache() { - CacheConfiguration<Integer, Map<Integer, Double>> cfg = new CacheConfiguration<>(); + private IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> newCache() { + CacheConfiguration<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cfg = new CacheConfiguration<>(); // Write to primary. cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); @@ -106,16 +114,18 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix cfg.setCacheMode(CacheMode.PARTITIONED); // Random cache name. - cfg.setName(new IgniteUuid().shortString()); + cfg.setName(ML_CACHE_NAME); - return Ignition.localIgnite().getOrCreateCache(cfg); + IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cfg); + + return cache; } /** * * */ - public IgniteCache<Integer, Map<Integer, Double>> cache() { + public IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache() { return cache; } @@ -138,34 +148,36 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix /** {@inheritDoc} */ @Override public double get(int x, int y) { if (stoMode == ROW_STORAGE_MODE) - return matrixGet(cache.getName(), x, y); + return matrixGet(x, y); else - return matrixGet(cache.getName(), y, x); + return matrixGet(y, x); } /** {@inheritDoc} */ @Override public void set(int x, int y, double v) { if (stoMode == ROW_STORAGE_MODE) - matrixSet(cache.getName(), x, y, v); + matrixSet(x, y, v); else - matrixSet(cache.getName(), y, x, v); + matrixSet(y, x, v); } /** * Distributed matrix get. * - * @param cacheName Matrix's cache. * @param a Row or column index. * @param b Row or column index. * @return Matrix value at (a, b) index. */ - private double matrixGet(String cacheName, int a, int b) { + private double matrixGet(int a, int b) { // Remote get from the primary node (where given row or column is stored locally). - return ignite().compute(groupForKey(cacheName, a)).call(() -> { - IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName); + return ignite().compute(groupForKey(ML_CACHE_NAME, a)).call(() -> { + IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME); // Local get. - Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY); + Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY); + + if (map == null) + map = cache.get(getCacheKey(a)); return (map == null || !map.containsKey(b)) ? 0.0 : map.get(b); }); @@ -174,21 +186,25 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix /** * Distributed matrix set. * - * @param cacheName Matrix's cache. * @param a Row or column index. * @param b Row or column index. * @param v New value to set. */ - private void matrixSet(String cacheName, int a, int b, double v) { + private void matrixSet(int a, int b, double v) { // Remote set on the primary node (where given row or column is stored locally). - ignite().compute(groupForKey(cacheName, a)).run(() -> { - IgniteCache<Integer, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(cacheName); + ignite().compute(groupForKey(ML_CACHE_NAME, a)).run(() -> { + IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>> cache = Ignition.localIgnite().getOrCreateCache(ML_CACHE_NAME); // Local get. - Map<Integer, Double> map = cache.localPeek(a, CachePeekMode.PRIMARY); + Map<Integer, Double> map = cache.localPeek(getCacheKey(a), CachePeekMode.PRIMARY); - if (map == null) - map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() : new Int2DoubleOpenHashMap(); + + if (map == null) { + map = cache.get(getCacheKey(a)); //Remote entry get. + + if (map == null) + map = acsMode == SEQUENTIAL_ACCESS_MODE ? new Int2DoubleRBTreeMap() : new Int2DoubleOpenHashMap(); + } if (v != 0.0) map.put(b, v); @@ -196,10 +212,15 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix map.remove(b); // Local put. - cache.put(a, map); + cache.put(getCacheKey(a), map); }); } + /** Build cache key for row/column. */ + private IgniteBiTuple<Integer, IgniteUuid> getCacheKey(int idx){ + return new IgniteBiTuple<>(idx, uuid); + } + /** {@inheritDoc} */ @Override public int columnSize() { return cols; @@ -216,6 +237,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix out.writeInt(cols); out.writeInt(acsMode); out.writeInt(stoMode); + out.writeObject(uuid); out.writeUTF(cache.getName()); } @@ -225,6 +247,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix cols = in.readInt(); acsMode = in.readInt(); stoMode = in.readInt(); + uuid = (IgniteUuid)in.readObject(); cache = ignite().getOrCreateCache(in.readUTF()); } @@ -253,9 +276,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix return false; } - /** Destroy underlying cache. */ + /** Delete all data from cache. */ @Override public void destroy() { - cache.destroy(); + Set<IgniteBiTuple<Integer, IgniteUuid>> keyset = IntStream.range(0, rows).mapToObj(this::getCacheKey).collect(Collectors.toSet()); + + cache.clearAll(keyset); } /** {@inheritDoc} */ @@ -266,6 +291,7 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix res = res * 37 + rows; res = res * 37 + acsMode; res = res * 37 + stoMode; + res = res * 37 + uuid.hashCode(); res = res * 37 + cache.hashCode(); return res; @@ -282,6 +308,11 @@ public class SparseDistributedMatrixStorage extends CacheUtils implements Matrix SparseDistributedMatrixStorage that = (SparseDistributedMatrixStorage)obj; return rows == that.rows && cols == that.cols && acsMode == that.acsMode && stoMode == that.stoMode - && (cache != null ? cache.equals(that.cache) : that.cache == null); + && uuid.equals(that.uuid) && (cache != null ? cache.equals(that.cache) : that.cache == null); + } + + /** */ + public IgniteUuid getUUID() { + return uuid; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java index 5f41583..8d6d2af 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/MathImplMainTestSuite.java @@ -21,12 +21,13 @@ import org.junit.runner.RunWith; import org.junit.runners.Suite; /** - * Test suite for local and distributed tests + * Test suite for local and distributed tests. */ @RunWith(Suite.class) @Suite.SuiteClasses({ MathImplLocalTestSuite.class, - MathImplDistributedTestSuite.class + MathImplDistributedTestSuite.class, + TracerTest.class }) public class MathImplMainTestSuite { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/156ec536/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java index 8985806..5ee2e7d 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/math/impls/matrix/SparseDistributedMatrixTest.java @@ -31,12 +31,19 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.Matrix; import org.apache.ignite.ml.math.StorageConstants; import org.apache.ignite.ml.math.exceptions.UnsupportedOperationException; import org.apache.ignite.ml.math.impls.MathTestConstants; +import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; @@ -110,7 +117,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest { double v = Math.random(); cacheMatrix.set(i, j, v); - assert Double.compare(v, cacheMatrix.get(i, j)) == 0; + assertEquals("Unexpected value for matrix element["+ i +" " + j + "]", v, cacheMatrix.get(i, j), PRECISION); } } } @@ -134,7 +141,7 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest { SparseDistributedMatrix objRestored = (SparseDistributedMatrix)objInputStream.readObject(); assertTrue(MathTestConstants.VAL_NOT_EQUALS, cacheMatrix.equals(objRestored)); - assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, 0.0); + assertEquals(MathTestConstants.VAL_NOT_EQUALS, objRestored.get(1, 1), 1.0, PRECISION); } /** Test simple math. */ @@ -225,19 +232,44 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest { } /** */ + public void testCacheBehaviour(){ + IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); + + SparseDistributedMatrix cacheMatrix1 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE); + SparseDistributedMatrix cacheMatrix2 = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE); + + initMtx(cacheMatrix1); + initMtx(cacheMatrix2); + + Collection<String> cacheNames = ignite.cacheNames(); + + assert cacheNames.contains(SparseDistributedMatrixStorage.ML_CACHE_NAME); + + IgniteCache<IgniteBiTuple<Integer, IgniteUuid>, Object> cache = ignite.getOrCreateCache(SparseDistributedMatrixStorage.ML_CACHE_NAME); + + Set<IgniteBiTuple<Integer, IgniteUuid>> keySet1 = buildKeySet(cacheMatrix1); + Set<IgniteBiTuple<Integer, IgniteUuid>> keySet2 = buildKeySet(cacheMatrix2); + + assert cache.containsKeys(keySet1); + assert cache.containsKeys(keySet2); + + cacheMatrix2.destroy(); + + assert cache.containsKeys(keySet1); + assert !cache.containsKeys(keySet2); + + cacheMatrix1.destroy(); + + assert !cache.containsKeys(keySet1); + } + + /** */ public void testLike() { IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); cacheMatrix = new SparseDistributedMatrix(rows, cols, StorageConstants.ROW_STORAGE_MODE, StorageConstants.RANDOM_ACCESS_MODE); - try { - cacheMatrix.like(1, 1); - fail("UnsupportedOperationException expected."); - } - catch (UnsupportedOperationException e) { - return; - } - fail("UnsupportedOperationException expected."); + assertNotNull(cacheMatrix.like(1, 1)); } /** */ @@ -262,4 +294,19 @@ public class SparseDistributedMatrixTest extends GridCommonAbstractTest { for (int j = 0; j < m.columnSize(); j++) m.set(i, j, 1.0); } + + /** Build key set for SparseDistributedMatrix. */ + private Set<IgniteBiTuple<Integer, IgniteUuid>> buildKeySet(SparseDistributedMatrix m){ + Set<IgniteBiTuple<Integer, IgniteUuid>> set = new HashSet<>(); + + SparseDistributedMatrixStorage storage = (SparseDistributedMatrixStorage)m.getStorage(); + + IgniteUuid uuid = storage.getUUID(); + int size = storage.storageMode() == StorageConstants.ROW_STORAGE_MODE ? storage.rowSize() : storage.columnSize(); + + for (int i = 0; i < size; i++) + set.add(new IgniteBiTuple<>(i, uuid)); + + return set; + } }