ignite-5280 SparseDistributedMatrix refactoring Signed-off-by: Andrey Gura <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46ec148c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46ec148c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46ec148c Branch: refs/heads/master Commit: 46ec148ca9ae7ecc668c2c0bb9547140d05d68e2 Parents: 4fe8f76 Author: Yury Babak <[email protected]> Authored: Wed Aug 23 20:53:08 2017 +0300 Committer: Andrey Gura <[email protected]> Committed: Wed Aug 23 20:53:41 2017 +0300 ---------------------------------------------------------------------- .../ml/math/matrix/CacheMatrixExample.java | 4 +- .../ml/math/vector/CacheVectorExample.java | 4 +- modules/ml/pom.xml | 1 - .../clustering/KMeansDistributedClusterer.java | 55 +- .../ignite/ml/math/IdentityValueMapper.java | 2 + .../apache/ignite/ml/math/MatrixKeyMapper.java | 30 - .../apache/ignite/ml/math/MatrixStorage.java | 7 + .../org/apache/ignite/ml/math/ValueMapper.java | 37 -- .../apache/ignite/ml/math/VectorKeyMapper.java | 29 - .../ignite/ml/math/distributed/CacheUtils.java | 546 ++++++++++++++++++ .../ml/math/distributed/DistributedStorage.java | 35 ++ .../ml/math/distributed/MatrixKeyMapper.java | 33 ++ .../ignite/ml/math/distributed/ValueMapper.java | 37 ++ .../ml/math/distributed/VectorKeyMapper.java | 32 ++ .../math/distributed/keys/BlockMatrixKey.java | 30 + .../math/distributed/keys/MatrixCacheKey.java | 35 ++ .../math/distributed/keys/RowColMatrixKey.java | 30 + .../distributed/keys/impl/BlockMatrixKey.java | 144 +++++ .../distributed/keys/impl/SparseMatrixKey.java | 142 +++++ .../distributed/keys/impl/package-info.java | 22 + .../ml/math/distributed/keys/package-info.java | 22 + .../ml/math/distributed/package-info.java | 22 + .../apache/ignite/ml/math/impls/CacheUtils.java | 559 ------------------- .../ml/math/impls/matrix/CacheMatrix.java | 6 +- .../impls/matrix/DenseLocalOnHeapMatrix.java | 4 +- .../matrix/SparseBlockDistributedMatrix.java | 16 +- .../impls/matrix/SparseDistributedMatrix.java | 83 ++- .../storage/matrix/BaseBlockMatrixKey.java | 41 -- .../impls/storage/matrix/BlockMatrixKey.java | 144 ----- .../storage/matrix/BlockMatrixStorage.java | 38 +- .../storage/matrix/CacheMatrixStorage.java | 9 +- .../matrix/DenseOffHeapMatrixStorage.java | 5 + .../storage/matrix/DiagonalMatrixStorage.java | 5 + .../storage/matrix/FunctionMatrixStorage.java | 5 + .../storage/matrix/MatrixDelegateStorage.java | 5 + .../storage/matrix/PivotedMatrixStorage.java | 5 + .../storage/matrix/RandomMatrixStorage.java | 5 + .../matrix/SparseDistributedMatrixStorage.java | 54 +- .../matrix/SparseLocalOnHeapMatrixStorage.java | 6 +- .../storage/vector/CacheVectorStorage.java | 4 +- .../ml/math/impls/vector/CacheVector.java | 6 +- .../ml/math/impls/matrix/CacheMatrixTest.java | 2 +- .../impls/matrix/MatrixKeyMapperForTests.java | 2 +- .../SparseDistributedBlockMatrixTest.java | 7 +- .../matrix/SparseDistributedMatrixTest.java | 40 +- .../ml/math/impls/vector/CacheVectorTest.java | 2 +- 46 files changed, 1368 insertions(+), 984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/examples/src/main/ml/org/apache/ignite/examples/ml/math/matrix/CacheMatrixExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/ml/org/apache/ignite/examples/ml/math/matrix/CacheMatrixExample.java b/examples/src/main/ml/org/apache/ignite/examples/ml/math/matrix/CacheMatrixExample.java index d7bb8ae..a7cbaab 100644 --- a/examples/src/main/ml/org/apache/ignite/examples/ml/math/matrix/CacheMatrixExample.java +++ b/examples/src/main/ml/org/apache/ignite/examples/ml/math/matrix/CacheMatrixExample.java @@ -23,9 +23,9 @@ import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.examples.ml.math.vector.CacheVectorExample; import org.apache.ignite.ml.math.IdentityValueMapper; -import org.apache.ignite.ml.math.MatrixKeyMapper; import org.apache.ignite.ml.math.Tracer; -import org.apache.ignite.ml.math.ValueMapper; +import org.apache.ignite.ml.math.distributed.MatrixKeyMapper; +import org.apache.ignite.ml.math.distributed.ValueMapper; import org.apache.ignite.ml.math.functions.Functions; import org.apache.ignite.ml.math.impls.matrix.CacheMatrix; http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/examples/src/main/ml/org/apache/ignite/examples/ml/math/vector/CacheVectorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/ml/org/apache/ignite/examples/ml/math/vector/CacheVectorExample.java b/examples/src/main/ml/org/apache/ignite/examples/ml/math/vector/CacheVectorExample.java index 14ec43b..4253ac1 100644 --- a/examples/src/main/ml/org/apache/ignite/examples/ml/math/vector/CacheVectorExample.java +++ b/examples/src/main/ml/org/apache/ignite/examples/ml/math/vector/CacheVectorExample.java @@ -22,8 +22,8 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.ml.math.IdentityValueMapper; -import org.apache.ignite.ml.math.ValueMapper; -import org.apache.ignite.ml.math.VectorKeyMapper; +import org.apache.ignite.ml.math.distributed.ValueMapper; +import org.apache.ignite.ml.math.distributed.VectorKeyMapper; import org.apache.ignite.ml.math.impls.vector.CacheVector; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/pom.xml ---------------------------------------------------------------------- diff --git a/modules/ml/pom.xml b/modules/ml/pom.xml index 8774157..7d5d64f 100644 --- a/modules/ml/pom.xml +++ b/modules/ml/pom.xml @@ -91,7 +91,6 @@ <groupId>com.github.fommil.netlib</groupId> <artifactId>core</artifactId> <version>${netlibjava.version}</version> - <type>pom</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java index 2ef61ad..d6a3fc3 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/clustering/KMeansDistributedClusterer.java @@ -29,23 +29,26 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.ml.math.DistanceMeasure; import org.apache.ignite.ml.math.Vector; import org.apache.ignite.ml.math.VectorUtils; +import org.apache.ignite.ml.math.distributed.CacheUtils; +import org.apache.ignite.ml.math.distributed.keys.impl.SparseMatrixKey; import org.apache.ignite.ml.math.exceptions.ConvergenceException; import org.apache.ignite.ml.math.exceptions.MathIllegalArgumentException; import org.apache.ignite.ml.math.functions.Functions; import org.apache.ignite.ml.math.functions.IgniteBiFunction; -import org.apache.ignite.ml.math.impls.CacheUtils; import org.apache.ignite.ml.math.impls.matrix.DenseLocalOnHeapMatrix; import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; import org.apache.ignite.ml.math.impls.storage.matrix.SparseDistributedMatrixStorage; import org.apache.ignite.ml.math.util.MapUtil; import org.apache.ignite.ml.math.util.MatrixUtil; -import static org.apache.ignite.ml.math.impls.CacheUtils.distributedFold; +import static org.apache.ignite.ml.math.distributed.CacheUtils.distributedFold; import static org.apache.ignite.ml.math.util.MatrixUtil.localCopyOf; /** * Clustering algorithm based on Bahmani et al. paper and Apache Spark class with corresponding functionality. * + * TODO: IGNITE-6059, add block matrix support. + * * @see <a href="http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf">Scalable K-Means++(wikipedia)</a> */ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistributedMatrix> { @@ -80,6 +83,8 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri MathIllegalArgumentException, ConvergenceException { SparseDistributedMatrix pointsCp = (SparseDistributedMatrix)points.like(points.rowSize(), points.columnSize()); + String cacheName = ((SparseDistributedMatrixStorage)points.getStorage()).cacheName(); + // TODO: IGNITE-5825, this copy is very ineffective, just for POC. Immutability of data should be guaranteed by other methods // such as logical locks for example. pointsCp.assign(points); @@ -93,7 +98,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri // Execute iterations of Lloyd's algorithm until converged while (iteration < maxIterations && !converged) { - SumsAndCounts stats = getSumsAndCounts(centers, dim, uid); + SumsAndCounts stats = getSumsAndCounts(centers, dim, uid, cacheName); converged = true; @@ -119,6 +124,8 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri // Initialize empty centers and point costs. int ptsCnt = points.rowSize(); + String cacheName = ((SparseDistributedMatrixStorage)points.getStorage()).cacheName(); + // Initialize the first center to a random point. Vector sample = localCopyOf(points.viewRow(rnd.nextInt(ptsCnt))); @@ -137,7 +144,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri while (step < initSteps) { // We assume here that costs can fit into memory of one node. - ConcurrentHashMap<Integer, Double> newCosts = getNewCosts(points, newCenters); + ConcurrentHashMap<Integer, Double> newCosts = getNewCosts(points, newCenters, cacheName); // Merge costs with new costs. for (Integer ind : newCosts.keySet()) @@ -145,7 +152,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri double sumCosts = costs.values().stream().mapToDouble(Double::valueOf).sum(); - newCenters = getNewCenters(k, costs, uid, sumCosts); + newCenters = getNewCenters(k, costs, uid, sumCosts, cacheName); centers.addAll(newCenters); step++; @@ -159,7 +166,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri // Finally, we might have a set of more than k distinct candidate centers; weight each // candidate by the number of points in the dataset mapping to it and run a local k-means++ // on the weighted centers to pick k of them - ConcurrentHashMap<Integer, Integer> centerInd2Weight = weightCenters(uid, distinctCenters); + ConcurrentHashMap<Integer, Integer> centerInd2Weight = weightCenters(uid, distinctCenters, cacheName); List<Double> weights = new ArrayList<>(centerInd2Weight.size()); @@ -174,12 +181,12 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri /** */ private List<Vector> getNewCenters(int k, ConcurrentHashMap<Integer, Double> costs, IgniteUuid uid, - double sumCosts) { - return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME, - (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>, + double sumCosts, String cacheName) { + return distributedFold(cacheName, + (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>, List<Vector>, List<Vector>>)(vectorWithIndex, list) -> { - Integer ind = vectorWithIndex.getKey().get1(); + Integer ind = vectorWithIndex.getKey().index(); double prob = costs.get(ind) * 2.0 * k / sumCosts; @@ -188,7 +195,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri return list; }, - key -> key.get2().equals(uid), + key -> key.matrixId().equals(uid), (list1, list2) -> { list1.addAll(list2); return list1; @@ -198,17 +205,17 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri } /** */ - private ConcurrentHashMap<Integer, Double> getNewCosts(SparseDistributedMatrix points, List<Vector> newCenters) { - return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME, - (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, ConcurrentHashMap<Integer, Double>>, + private ConcurrentHashMap<Integer, Double> getNewCosts(SparseDistributedMatrix points, List<Vector> newCenters, String cacheName) { + return distributedFold(cacheName, + (IgniteBiFunction<Cache.Entry<SparseMatrixKey, ConcurrentHashMap<Integer, Double>>, ConcurrentHashMap<Integer, Double>, ConcurrentHashMap<Integer, Double>>)(vectorWithIndex, map) -> { for (Vector center : newCenters) - map.merge(vectorWithIndex.getKey().get1(), distance(vectorWithIndex.getValue(), center), Functions.MIN); + map.merge(vectorWithIndex.getKey().index(), distance(vectorWithIndex.getValue(), center), Functions.MIN); return map; }, - key -> key.get2().equals(points.getUUID()), + key -> key.matrixId().equals(points.getUUID()), (map1, map2) -> { map1.putAll(map2); return map1; @@ -216,9 +223,9 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri } /** */ - private ConcurrentHashMap<Integer, Integer> weightCenters(IgniteUuid uid, List<Vector> distinctCenters) { - return distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME, - (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>, + private ConcurrentHashMap<Integer, Integer> weightCenters(IgniteUuid uid, List<Vector> distinctCenters, String cacheName) { + return distributedFold(cacheName, + (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>, ConcurrentHashMap<Integer, Integer>, ConcurrentHashMap<Integer, Integer>>)(vectorWithIndex, countMap) -> { Integer resInd = -1; @@ -239,7 +246,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri countMap.compute(resInd, (ind, v) -> v != null ? v + 1 : 1); return countMap; }, - key -> key.get2().equals(uid), + key -> key.matrixId().equals(uid), (map1, map2) -> MapUtil.mergeMaps(map1, map2, (integer, integer2) -> integer2 + integer, ConcurrentHashMap::new), new ConcurrentHashMap<>()); @@ -251,9 +258,9 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri } /** */ - private SumsAndCounts getSumsAndCounts(Vector[] centers, int dim, IgniteUuid uid) { - return CacheUtils.distributedFold(SparseDistributedMatrixStorage.ML_CACHE_NAME, - (IgniteBiFunction<Cache.Entry<IgniteBiTuple<Integer, IgniteUuid>, Map<Integer, Double>>, SumsAndCounts, SumsAndCounts>)(entry, counts) -> { + private SumsAndCounts getSumsAndCounts(Vector[] centers, int dim, IgniteUuid uid, String cacheName) { + return CacheUtils.distributedFold(cacheName, + (IgniteBiFunction<Cache.Entry<SparseMatrixKey, Map<Integer, Double>>, SumsAndCounts, SumsAndCounts>)(entry, counts) -> { Map<Integer, Double> vec = entry.getValue(); IgniteBiTuple<Integer, Double> closest = findClosest(centers, VectorUtils.fromMap(vec, false)); @@ -270,7 +277,7 @@ public class KMeansDistributedClusterer extends BaseKMeansClusterer<SparseDistri return counts; }, - key -> key.get2().equals(uid), + key -> key.matrixId().equals(uid), SumsAndCounts::merge, new SumsAndCounts() ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/IdentityValueMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/IdentityValueMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/IdentityValueMapper.java index 3c94edd..615006e 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/IdentityValueMapper.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/IdentityValueMapper.java @@ -17,6 +17,8 @@ package org.apache.ignite.ml.math; +import org.apache.ignite.ml.math.distributed.ValueMapper; + /** * Identity value mapper. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixKeyMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixKeyMapper.java deleted file mode 100644 index 54d2088..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixKeyMapper.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math; - -/** - * Maps {@link Matrix} row and column index to cache key. - */ -public interface MatrixKeyMapper<K> extends KeyMapper<K> { - /** - * @param x Matrix row index. - * @param y Matrix column index. - * @return Cache key for given row and column. - */ - public K apply(int x, int y); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixStorage.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixStorage.java index a80e066..e4f9e40 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixStorage.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/MatrixStorage.java @@ -55,6 +55,13 @@ public interface MatrixStorage extends Externalizable, StorageOpsMetrics, Destro public int storageMode(); /** + * @return Matrix access mode. + * + * @see StorageConstants + */ + public int accessMode(); + + /** * Gets underlying data, if {@link StorageOpsMetrics#isArrayBased()} returns {@code false} this method return * copy of data. The data must be adapted for {@link Blas}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/ValueMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/ValueMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/ValueMapper.java deleted file mode 100644 index f0776a3..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/ValueMapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math; - -import java.io.Serializable; - -/** - * Utility mapper that can be used to map arbitrary values types to and from double. - */ -public interface ValueMapper<V> extends Serializable { - /** - * @param v Value to map from double. - * @return Mapped value. - */ - public V fromDouble(double v); - - /** - * @param v Value to map to double. - * @return Mapped value. - */ - public double toDouble(V v); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorKeyMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorKeyMapper.java deleted file mode 100644 index 4b8fadb..0000000 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/VectorKeyMapper.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.math; - -/** - * Maps {@link Vector} element index to cache key. - */ -public interface VectorKeyMapper<K> extends KeyMapper<K> { - /** - * @param i Vector element index. - * @return Cache key for given element index. - */ - public K apply(int i); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java new file mode 100644 index 0000000..9a73c5a --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/CacheUtils.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.BinaryOperator; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.ml.math.KeyMapper; +import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey; +import org.apache.ignite.ml.math.distributed.keys.impl.BlockMatrixKey; +import org.apache.ignite.ml.math.functions.IgniteBiFunction; +import org.apache.ignite.ml.math.functions.IgniteConsumer; +import org.apache.ignite.ml.math.functions.IgniteDoubleFunction; +import org.apache.ignite.ml.math.functions.IgniteFunction; +import org.apache.ignite.ml.math.impls.matrix.BlockEntry; + +/** + * Distribution-related misc. support. + * + * TODO: IGNITE-5102, fix sparse key filters. + */ +public class CacheUtils { + /** + * Cache entry support. + * + * @param <K> + * @param <V> + */ + public static class CacheEntry<K, V> { + /** */ + private Cache.Entry<K, V> entry; + /** */ + private IgniteCache<K, V> cache; + + /** + * @param entry Original cache entry. + * @param cache Cache instance. + */ + CacheEntry(Cache.Entry<K, V> entry, IgniteCache<K, V> cache) { + this.entry = entry; + this.cache = cache; + } + + /** + * + * + */ + public Cache.Entry<K, V> entry() { + return entry; + } + + /** + * + * + */ + public IgniteCache<K, V> cache() { + return cache; + } + } + + /** + * Gets local Ignite instance. + */ + public static Ignite ignite() { + return Ignition.localIgnite(); + } + + /** + * @param cacheName Cache name. + * @param k Key into the cache. + * @param <K> Key type. + * @return Cluster group for given key. + */ + public static <K> ClusterGroup groupForKey(String cacheName, K k) { + return ignite().cluster().forNode(ignite().affinity(cacheName).mapKeyToNode(k)); + } + + /** + * @param cacheName Cache name. + * @param keyMapper {@link KeyMapper} to validate cache key. + * @param valMapper {@link ValueMapper} to obtain double value for given cache key. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @return Sum of the values obtained for valid keys. + */ + public static <K, V> double sum(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { + Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + if (keyMapper.isValid(ce.entry().getKey())) { + double v = valMapper.toDouble(ce.entry().getValue()); + + return acc == null ? v : acc + v; + } + else + return acc; + }); + + return sum(subSums); + } + + /** + * @param matrixUuid Matrix UUID. + * @return Sum obtained using sparse logic. + */ + @SuppressWarnings("unchecked") + public static <K, V> double sparseSum(IgniteUuid matrixUuid, String cacheName) { + A.notNull(matrixUuid, "matrixUuid"); + A.notNull(cacheName, "cacheName"); + + Collection<Double> subSums = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + V v = ce.entry().getValue(); + + double sum = 0.0; + + if (v instanceof Map) { + Map<Integer, Double> map = (Map<Integer, Double>)v; + + sum = sum(map.values()); + } + else if (v instanceof BlockEntry) { + BlockEntry be = (BlockEntry)v; + + sum = be.sum(); + } + else + throw new UnsupportedOperationException(); + + return acc == null ? sum : acc + sum; + }, sparseKeyFilter(matrixUuid)); + + return sum(subSums); + } + + /** + * @param c {@link Collection} of double values to sum. + * @return Sum of the values. + */ + private static double sum(Collection<Double> c) { + double sum = 0.0; + + for (double d : c) + sum += d; + + return sum; + } + + /** + * @param cacheName Cache name. + * @param keyMapper {@link KeyMapper} to validate cache key. + * @param valMapper {@link ValueMapper} to obtain double value for given cache key. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @return Minimum value for valid keys. + */ + public static <K, V> double min(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { + Collection<Double> mins = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + if (keyMapper.isValid(ce.entry().getKey())) { + double v = valMapper.toDouble(ce.entry().getValue()); + + if (acc == null) + return v; + else + return Math.min(acc, v); + } + else + return acc; + }); + + return Collections.min(mins); + } + + /** + * @param matrixUuid Matrix UUID. + * @return Minimum value obtained using sparse logic. + */ + @SuppressWarnings("unchecked") + public static <K, V> double sparseMin(IgniteUuid matrixUuid, String cacheName) { + A.notNull(matrixUuid, "matrixUuid"); + A.notNull(cacheName, "cacheName"); + + Collection<Double> mins = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + V v = ce.entry().getValue(); + + double min; + + if (v instanceof Map) { + Map<Integer, Double> map = (Map<Integer, Double>)v; + + min = Collections.min(map.values()); + } + else if (v instanceof BlockEntry) { + BlockEntry be = (BlockEntry)v; + + min = be.minValue(); + } + else + throw new UnsupportedOperationException(); + + if (acc == null) + return min; + else + return Math.min(acc, min); + + }, sparseKeyFilter(matrixUuid)); + + return Collections.min(mins); + } + + /** + * @param matrixUuid Matrix UUID. + * @return Maximum value obtained using sparse logic. + */ + @SuppressWarnings("unchecked") + public static <K, V> double sparseMax(IgniteUuid matrixUuid, String cacheName) { + A.notNull(matrixUuid, "matrixUuid"); + A.notNull(cacheName, "cacheName"); + + Collection<Double> maxes = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + V v = ce.entry().getValue(); + + double max; + + if (v instanceof Map) { + Map<Integer, Double> map = (Map<Integer, Double>)v; + + max = Collections.max(map.values()); + } + else if (v instanceof BlockEntry) { + BlockEntry be = (BlockEntry)v; + + max = be.maxValue(); + } + else + throw new UnsupportedOperationException(); + + if (acc == null) + return max; + else + return Math.max(acc, max); + + }, sparseKeyFilter(matrixUuid)); + + return Collections.max(maxes); + } + + /** + * @param cacheName Cache name. + * @param keyMapper {@link KeyMapper} to validate cache key. + * @param valMapper {@link ValueMapper} to obtain double value for given cache key. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @return Maximum value for valid keys. + */ + public static <K, V> double max(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { + Collection<Double> maxes = fold(cacheName, (CacheEntry<K, V> ce, Double acc) -> { + if (keyMapper.isValid(ce.entry().getKey())) { + double v = valMapper.toDouble(ce.entry().getValue()); + + if (acc == null) + return v; + else + return Math.max(acc, v); + } + else + return acc; + }); + + return Collections.max(maxes); + } + + /** + * @param cacheName Cache name. + * @param keyMapper {@link KeyMapper} to validate cache key. + * @param valMapper {@link ValueMapper} to obtain double value for given cache key. + * @param mapper Mapping {@link IgniteFunction}. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + */ + public static <K, V> void map(String cacheName, KeyMapper<K> keyMapper, ValueMapper<V> valMapper, + IgniteFunction<Double, Double> mapper) { + foreach(cacheName, (CacheEntry<K, V> ce) -> { + K k = ce.entry().getKey(); + + if (keyMapper.isValid(k)) + // Actual assignment. + ce.cache().put(k, valMapper.fromDouble(mapper.apply(valMapper.toDouble(ce.entry().getValue())))); + }); + } + + /** + * @param matrixUuid Matrix UUID. + * @param mapper Mapping {@link IgniteFunction}. + */ + @SuppressWarnings("unchecked") + public static <K, V> void sparseMap(IgniteUuid matrixUuid, IgniteDoubleFunction<Double> mapper, String cacheName) { + A.notNull(matrixUuid, "matrixUuid"); + A.notNull(cacheName, "cacheName"); + A.notNull(mapper, "mapper"); + + foreach(cacheName, (CacheEntry<K, V> ce) -> { + K k = ce.entry().getKey(); + + V v = ce.entry().getValue(); + + if (v instanceof Map) { + Map<Integer, Double> map = (Map<Integer, Double>)v; + + for (Map.Entry<Integer, Double> e : (map.entrySet())) + e.setValue(mapper.apply(e.getValue())); + + } + else if (v instanceof BlockEntry) { + BlockEntry be = (BlockEntry)v; + + be.map(mapper); + } + else + throw new UnsupportedOperationException(); + + ce.cache().put(k, v); + }, sparseKeyFilter(matrixUuid)); + } + + /** + * Filter for distributed matrix keys. + * + * @param matrixUuid Matrix uuid. + */ + private static <K> IgnitePredicate<K> sparseKeyFilter(IgniteUuid matrixUuid) { + return key -> { + if (key instanceof BlockMatrixKey) + return ((BlockMatrixKey)key).matrixId().equals(matrixUuid); + else if (key instanceof RowColMatrixKey) + return ((RowColMatrixKey)key).matrixId().equals(matrixUuid); + else + throw new UnsupportedOperationException(); + }; + } + + /** + * @param cacheName Cache name. + * @param fun An operation that accepts a cache entry and processes it. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + */ + public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun) { + foreach(cacheName, fun, null); + } + + /** + * @param cacheName Cache name. + * @param fun An operation that accepts a cache entry and processes it. + * @param keyFilter Cache keys filter. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + */ + public static <K, V> void foreach(String cacheName, IgniteConsumer<CacheEntry<K, V>> fun, + IgnitePredicate<K> keyFilter) { + bcast(cacheName, () -> { + Ignite ignite = Ignition.localIgnite(); + IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); + + int partsCnt = ignite.affinity(cacheName).partitions(); + + // Use affinity in filter for scan query. Otherwise we accept consumer in each node which is wrong. + Affinity affinity = ignite.affinity(cacheName); + ClusterNode locNode = ignite.cluster().localNode(); + + // Iterate over all partitions. Some of them will be stored on that local node. + for (int part = 0; part < partsCnt; part++) { + int p = part; + + // Iterate over given partition. + // Query returns an empty cursor if this partition is not stored on this node. + for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + fun.accept(new CacheEntry<>(entry, cache)); + } + }); + } + + /** + * <b>Currently fold supports only commutative operations.<b/> + * + * @param cacheName Cache name. + * @param folder Fold function operating over cache entries. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @param <A> Fold result type. + * @return Fold operation result. + */ + public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder) { + return fold(cacheName, folder, null); + } + + /** + * <b>Currently fold supports only commutative operations.<b/> + * + * @param cacheName Cache name. + * @param folder Fold function operating over cache entries. + * @param <K> Cache key object type. + * @param <V> Cache value object type. + * @param <A> Fold result type. + * @return Fold operation result. + */ + public static <K, V, A> Collection<A> fold(String cacheName, IgniteBiFunction<CacheEntry<K, V>, A, A> folder, + IgnitePredicate<K> keyFilter) { + return bcast(cacheName, () -> { + Ignite ignite = Ignition.localIgnite(); + IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); + + int partsCnt = ignite.affinity(cacheName).partitions(); + + // Use affinity in filter for ScanQuery. Otherwise we accept consumer in each node which is wrong. + Affinity affinity = ignite.affinity(cacheName); + ClusterNode locNode = ignite.cluster().localNode(); + + A a = null; + + // Iterate over all partitions. Some of them will be stored on that local node. + for (int part = 0; part < partsCnt; part++) { + int p = part; + + // Iterate over given partition. + // Query returns an empty cursor if this partition is not stored on this node. + for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + a = folder.apply(new CacheEntry<>(entry, cache), a); + } + + return a; + }); + } + + /** + * Distributed version of fold operation. + * + * @param cacheName Cache name. + * @param folder Folder. + * @param keyFilter Key filter. + * @param accumulator Accumulator. + * @param zeroVal Zero value. + */ + public static <K, V, A> A distributedFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder, + IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal) { + return sparseFold(cacheName, folder, keyFilter, accumulator, zeroVal, null, null, 0, + false); + } + + /** + * Sparse version of fold. This method also applicable to sparse zeroes. + * + * @param cacheName Cache name. + * @param folder Folder. + * @param keyFilter Key filter. + * @param accumulator Accumulator. + * @param zeroVal Zero value. + * @param defVal Def value. + * @param defKey Def key. + * @param defValCnt Def value count. + * @param isNilpotent Is nilpotent. + */ + private static <K, V, A> A sparseFold(String cacheName, IgniteBiFunction<Cache.Entry<K, V>, A, A> folder, + IgnitePredicate<K> keyFilter, BinaryOperator<A> accumulator, A zeroVal, V defVal, K defKey, long defValCnt, + boolean isNilpotent) { + + A defRes = zeroVal; + + if (!isNilpotent) + for (int i = 0; i < defValCnt; i++) + defRes = folder.apply(new CacheEntryImpl<>(defKey, defVal), defRes); + + Collection<A> totalRes = bcast(cacheName, () -> { + Ignite ignite = Ignition.localIgnite(); + IgniteCache<K, V> cache = ignite.getOrCreateCache(cacheName); + + int partsCnt = ignite.affinity(cacheName).partitions(); + + // Use affinity in filter for ScanQuery. Otherwise we accept consumer in each node which is wrong. + Affinity affinity = ignite.affinity(cacheName); + ClusterNode locNode = ignite.cluster().localNode(); + + A a = zeroVal; + + // Iterate over all partitions. Some of them will be stored on that local node. + for (int part = 0; part < partsCnt; part++) { + int p = part; + + // Iterate over given partition. + // Query returns an empty cursor if this partition is not stored on this node. + for (Cache.Entry<K, V> entry : cache.query(new ScanQuery<K, V>(part, + (k, v) -> affinity.mapPartitionToNode(p) == locNode && (keyFilter == null || keyFilter.apply(k))))) + a = folder.apply(entry, a); + } + + return a; + }); + totalRes.add(defRes); + return totalRes.stream().reduce(zeroVal, accumulator); + } + + /** + * @param cacheName Cache name. + * @param run {@link Runnable} to broadcast to cache nodes for given cache name. + */ + public static void bcast(String cacheName, IgniteRunnable run) { + ignite().compute(ignite().cluster().forCacheNodes(cacheName)).broadcast(run); + } + + /** + * @param cacheName Cache name. + * @param call {@link IgniteCallable} to broadcast to cache nodes for given cache name. + * @param <A> Type returned by the callable. + */ + public static <A> Collection<A> bcast(String cacheName, IgniteCallable<A> call) { + return ignite().compute(ignite().cluster().forCacheNodes(cacheName)).broadcast(call); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/DistributedStorage.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/DistributedStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/DistributedStorage.java new file mode 100644 index 0000000..7b58d1d --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/DistributedStorage.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed; + +import java.util.Set; + +/** + * Extension for any distributed storage. + */ +public interface DistributedStorage<K> { + /** + * Build a keyset for this storage. + */ + public Set<K> getAllKeys(); + + /** + * @return The name of cache used in this storage. + */ + public String cacheName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/MatrixKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/MatrixKeyMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/MatrixKeyMapper.java new file mode 100644 index 0000000..2a93328 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/MatrixKeyMapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed; + +import org.apache.ignite.ml.math.KeyMapper; +import org.apache.ignite.ml.math.Matrix; + +/** + * Maps {@link Matrix} row and column index to cache key. + */ +public interface MatrixKeyMapper<K> extends KeyMapper<K> { + /** + * @param x Matrix row index. + * @param y Matrix column index. + * @return Cache key for given row and column. + */ + public K apply(int x, int y); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/ValueMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/ValueMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/ValueMapper.java new file mode 100644 index 0000000..c94cfb6 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/ValueMapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed; + +import java.io.Serializable; + +/** + * Utility mapper that can be used to map arbitrary values types to and from double. + */ +public interface ValueMapper<V> extends Serializable { + /** + * @param v Value to map from double. + * @return Mapped value. + */ + public V fromDouble(double v); + + /** + * @param v Value to map to double. + * @return Mapped value. + */ + public double toDouble(V v); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/VectorKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/VectorKeyMapper.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/VectorKeyMapper.java new file mode 100644 index 0000000..de08d7b --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/VectorKeyMapper.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed; + +import org.apache.ignite.ml.math.KeyMapper; +import org.apache.ignite.ml.math.Vector; + +/** + * Maps {@link Vector} element index to cache key. + */ +public interface VectorKeyMapper<K> extends KeyMapper<K> { + /** + * @param i Vector element index. + * @return Cache key for given element index. + */ + public K apply(int i); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java new file mode 100644 index 0000000..c55b950 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/BlockMatrixKey.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; + +/** + * Cache key for blocks in {@link SparseBlockDistributedMatrix}. + */ +public interface BlockMatrixKey extends MatrixCacheKey { + /** + * @return block id. + */ + public long blockId(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java new file mode 100644 index 0000000..669e9a4 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/MatrixCacheKey.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import org.apache.ignite.lang.IgniteUuid; + +/** + * Base matrix cache key. + */ +public interface MatrixCacheKey { + /** + * @return matrix id. + */ + public IgniteUuid matrixId(); + + /** + * @return affinity key. + */ + public IgniteUuid affinityKey(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java new file mode 100644 index 0000000..168f49f --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/RowColMatrixKey.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys; + +import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; + +/** + * Cache key for {@link SparseDistributedMatrix}. + */ +public interface RowColMatrixKey extends MatrixCacheKey { + /** + * Return index value(blockId, Row/Col index, etc.) + */ + public int index(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java new file mode 100644 index 0000000..5fd1a16 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/BlockMatrixKey.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys.impl; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.ml.math.impls.matrix.BlockEntry; +import org.apache.ignite.ml.math.impls.matrix.SparseBlockDistributedMatrix; +import org.jetbrains.annotations.Nullable; + +/** + * Key implementation for {@link BlockEntry} using for {@link SparseBlockDistributedMatrix}. + */ +public class BlockMatrixKey implements org.apache.ignite.ml.math.distributed.keys.BlockMatrixKey, Externalizable, Binarylizable { + /** */ + private static final long serialVersionUID = 0L; + /** Block ID */ + private long blockId; + /** Matrix ID */ + private IgniteUuid matrixUuid; + /** Block affinity key. */ + private IgniteUuid affinityKey; + + /** + * Empty constructor required for {@link Externalizable}. + */ + public BlockMatrixKey() { + // No-op. + } + + /** + * Construct matrix block key. + * + * @param blockId Block id. + * @param matrixUuid Matrix uuid. + * @param affinityKey Affinity key. + */ + public BlockMatrixKey(long blockId, IgniteUuid matrixUuid, @Nullable IgniteUuid affinityKey) { + assert blockId >= 0; + assert matrixUuid != null; + + this.blockId = blockId; + this.matrixUuid = matrixUuid; + this.affinityKey = affinityKey; + } + + /** {@inheritDoc} */ + @Override public long blockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid matrixId() { + return matrixUuid; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid affinityKey() { + return affinityKey; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, matrixUuid); + U.writeGridUuid(out, affinityKey); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + matrixUuid = U.readGridUuid(in); + affinityKey = U.readGridUuid(in); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + BinaryRawWriter out = writer.rawWriter(); + + BinaryUtils.writeIgniteUuid(out, matrixUuid); + BinaryUtils.writeIgniteUuid(out, affinityKey); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + BinaryRawReader in = reader.rawReader(); + + matrixUuid = BinaryUtils.readIgniteUuid(in); + affinityKey = BinaryUtils.readIgniteUuid(in); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return matrixUuid.hashCode() + (int)(blockId ^ (blockId >>> 32)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || obj.getClass() != getClass()) + return false; + + BlockMatrixKey that = (BlockMatrixKey)obj; + + return blockId == that.blockId && matrixUuid.equals(that.matrixUuid) && F.eq(affinityKey, that.affinityKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(BlockMatrixKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java new file mode 100644 index 0000000..0c34c8b --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/SparseMatrixKey.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.math.distributed.keys.impl; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.ml.math.distributed.keys.RowColMatrixKey; +import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; + +/** + * Key implementation for {@link SparseDistributedMatrix}. + */ +public class SparseMatrixKey implements RowColMatrixKey, Externalizable, Binarylizable { + /** */ + private int idx; + /** */ + private IgniteUuid matrixId; + /** */ + private IgniteUuid affinityKey; + + /** + * Default constructor (required by Externalizable). + */ + public SparseMatrixKey(){ + + } + + /** + * Build Key. + */ + public SparseMatrixKey(int idx, IgniteUuid matrixId, IgniteUuid affinityKey) { + assert idx >= 0 : "Index must be positive."; + assert matrixId != null : "Matrix id can`t be null."; + + this.idx = idx; + this.matrixId = matrixId; + this.affinityKey = affinityKey; + } + + /** {@inheritDoc} */ + @Override public int index() { + return idx; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid matrixId() { + return matrixId; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid affinityKey() { + return affinityKey; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, matrixId); + U.writeGridUuid(out, affinityKey); + out.writeInt(idx); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + matrixId = U.readGridUuid(in); + affinityKey = U.readGridUuid(in); + idx = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + BinaryRawWriter out = writer.rawWriter(); + + BinaryUtils.writeIgniteUuid(out, matrixId); + BinaryUtils.writeIgniteUuid(out, affinityKey); + out.writeInt(idx); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + BinaryRawReader in = reader.rawReader(); + + matrixId = BinaryUtils.readIgniteUuid(in); + affinityKey = BinaryUtils.readIgniteUuid(in); + idx = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = 1; + + res += res * 37 + matrixId.hashCode(); + res += res * 37 + idx; + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || obj.getClass() != getClass()) + return false; + + SparseMatrixKey that = (SparseMatrixKey)obj; + + return idx == that.idx && matrixId.equals(that.matrixId) && F.eq(affinityKey, that.affinityKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SparseMatrixKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/package-info.java new file mode 100644 index 0000000..3a68ee2 --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/impl/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * * <!-- Package description. --> + * Contains matrix cache key implementations. + */ +package org.apache.ignite.ml.math.distributed.keys.impl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/package-info.java new file mode 100644 index 0000000..8954c6e --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/keys/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * * <!-- Package description. --> + * Contains matrix cache keys. + */ +package org.apache.ignite.ml.math.distributed.keys; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/46ec148c/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/package-info.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/package-info.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/package-info.java new file mode 100644 index 0000000..ad7399b --- /dev/null +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/distributed/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains classes for distribution support. + */ +package org.apache.ignite.ml.math.distributed; \ No newline at end of file
