IGNITE-6849: Fix of failing tests of K-Means distributed clustering. This closes #3009
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8195ba51 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8195ba51 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8195ba51 Branch: refs/heads/ignite-zk Commit: 8195ba512a7bff7b0e882ce01017724be8bbd8d7 Parents: a1b6a33 Author: Artem Malykh <amal...@gridgain.com> Authored: Thu Nov 9 16:43:51 2017 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Thu Nov 9 16:43:51 2017 +0300 ---------------------------------------------------------------------- .../impls/storage/matrix/MapWrapperStorage.java | 18 +- .../ml/math/impls/vector/MapWrapperVector.java | 8 + .../ml/clustering/ClusteringTestSuite.java | 3 +- .../KMeansDistributedClustererTest.java | 197 ------------------- ...KMeansDistributedClustererTestMultiNode.java | 146 ++++++++++++++ ...MeansDistributedClustererTestSingleNode.java | 197 +++++++++++++++++++ 6 files changed, 367 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java index 381ad75..4648421 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java @@ -30,10 +30,10 @@ import org.apache.ignite.ml.math.VectorStorage; */ public class MapWrapperStorage implements VectorStorage { /** Underlying map. */ - Map<Integer, Double> data; + private Map<Integer, Double> data; /** Vector size. */ - int size; + private int size; /** * Construct a wrapper around given map. @@ -41,6 +41,8 @@ public class MapWrapperStorage implements VectorStorage { * @param map Map to wrap. */ public MapWrapperStorage(Map<Integer, Double> map) { + data = map; + Set<Integer> keys = map.keySet(); GridArgumentCheck.notEmpty(keys, "map"); @@ -50,8 +52,14 @@ public class MapWrapperStorage implements VectorStorage { assert min >= 0; - data = map; - size = (max - min) + 1; + size = (max - min) + 1; + } + + /** + * No-op constructor for serialization. + */ + public MapWrapperStorage() { + // No-op. } /** {@inheritDoc} */ @@ -75,12 +83,14 @@ public class MapWrapperStorage implements VectorStorage { /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(data); + out.writeInt(size); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { data = (Map<Integer, Double>)in.readObject(); + size = in.readInt(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java index 83b40c1..58309f6 100644 --- a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java +++ b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java @@ -20,6 +20,7 @@ package org.apache.ignite.ml.math.impls.vector; import java.util.Map; import org.apache.ignite.ml.math.Matrix; import org.apache.ignite.ml.math.Vector; +import org.apache.ignite.ml.math.VectorStorage; import org.apache.ignite.ml.math.impls.storage.matrix.MapWrapperStorage; /** @@ -35,6 +36,13 @@ public class MapWrapperVector extends AbstractVector { setStorage(new MapWrapperStorage(map)); } + /** + * No-op constructor for serialization. + */ + public MapWrapperVector() { + // No-op. + } + /** {@inheritDoc} */ @Override public Vector like(int crd) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java index c39eeef..b4cce5e 100644 --- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java +++ b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java @@ -25,7 +25,8 @@ import org.junit.runners.Suite; */ @RunWith(Suite.class) @Suite.SuiteClasses({ - KMeansDistributedClustererTest.class, + KMeansDistributedClustererTestSingleNode.class, + KMeansDistributedClustererTestMultiNode.class, KMeansLocalClustererTest.class }) public class ClusteringTestSuite { http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java deleted file mode 100644 index a59b7f9..0000000 --- a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.ml.clustering; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.ml.math.DistanceMeasure; -import org.apache.ignite.ml.math.EuclideanDistance; -import org.apache.ignite.ml.math.StorageConstants; -import org.apache.ignite.ml.math.Vector; -import org.apache.ignite.ml.math.VectorUtils; -import org.apache.ignite.ml.math.functions.Functions; -import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; -import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Assert; -import org.junit.Test; - -import static org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood; - -/** */ -public class KMeansDistributedClustererTest extends GridCommonAbstractTest { - /** - * Number of nodes in grid. We should use 1 in this test because otherwise algorithm will be unstable - * (We cannot guarantee the order in which results are returned from each node). - */ - private static final int NODE_COUNT = 1; - - /** Grid instance. */ - private Ignite ignite; - - /** - * Default constructor. - */ - public KMeansDistributedClustererTest() { - super(false); - } - - /** - * {@inheritDoc} - */ - @Override protected void beforeTest() throws Exception { - ignite = grid(NODE_COUNT); - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - for (int i = 1; i <= NODE_COUNT; i++) - startGrid(i); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** */ - @Test - public void testPerformClusterAnalysisDegenerate() { - IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); - - KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L); - - double[] v1 = new double[] {1959, 325100}; - double[] v2 = new double[] {1960, 373200}; - - SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, StorageConstants.ROW_STORAGE_MODE, - StorageConstants.RANDOM_ACCESS_MODE); - - points.setRow(0, v1); - points.setRow(1, v2); - - KMeansModel mdl = clusterer.cluster(points, 1); - - Assert.assertEquals(1, mdl.centers().length); - Assert.assertEquals(2, mdl.centers()[0].size()); - } - - /** */ - @Test - public void testClusterizationOnDatasetWithObviousStructure() throws IOException { - IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); - - int ptsCnt = 10000; - int squareSideLen = 10000; - - Random rnd = new Random(123456L); - - // Let centers be in the vertices of square. - Map<Integer, Vector> centers = new HashMap<>(); - centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0})); - centers.put(900, new DenseLocalOnHeapVector(new double[] {squareSideLen, 0.0})); - centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, squareSideLen})); - centers.put(6000, new DenseLocalOnHeapVector(new double[] {squareSideLen, squareSideLen})); - - int centersCnt = centers.size(); - - SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 2, StorageConstants.ROW_STORAGE_MODE, - StorageConstants.RANDOM_ACCESS_MODE); - - List<Integer> permutation = IntStream.range(0, ptsCnt).boxed().collect(Collectors.toList()); - Collections.shuffle(permutation, rnd); - - Vector[] mc = new Vector[centersCnt]; - Arrays.fill(mc, VectorUtils.zeroes(2)); - - int centIdx = 0; - int totalCnt = 0; - - List<Vector> massCenters = new ArrayList<>(); - - for (Integer count : centers.keySet()) { - for (int i = 0; i < count; i++) { - DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new DenseLocalOnHeapVector(2).assign(centers.get(count)); - // pertrubate point on random value. - pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100); - mc[centIdx] = mc[centIdx].plus(pnt); - points.assignRow(permutation.get(totalCnt), pnt); - totalCnt++; - } - massCenters.add(mc[centIdx].times(1 / (double)count)); - centIdx++; - } - - EuclideanDistance dist = new EuclideanDistance(); - OrderedNodesComparator comp = new OrderedNodesComparator(centers.values().toArray(new Vector[] {}), dist); - - massCenters.sort(comp); - KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(dist, 3, 100, 1L); - - KMeansModel mdl = clusterer.cluster(points, 4); - Vector[] resCenters = mdl.centers(); - Arrays.sort(resCenters, comp); - - checkIsInEpsilonNeighbourhood(resCenters, massCenters.toArray(new Vector[] {}), 30.0); - } - - /** */ - private static class OrderedNodesComparator implements Comparator<Vector> { - /** */ - private final DistanceMeasure measure; - - /** */ - List<Vector> orderedNodes; - - /** */ - public OrderedNodesComparator(Vector[] orderedNodes, DistanceMeasure measure) { - this.orderedNodes = Arrays.asList(orderedNodes); - this.measure = measure; - } - - /** */ - private int findClosestNodeIndex(Vector v) { - return Functions.argmin(orderedNodes, v1 -> measure.compute(v1, v)).get1(); - } - - /** */ - @Override public int compare(Vector v1, Vector v2) { - int ind1 = findClosestNodeIndex(v1); - int ind2 = findClosestNodeIndex(v2); - - int signum = (int)Math.signum(ind1 - ind2); - - if (signum != 0) - return signum; - - return (int)Math.signum(orderedNodes.get(ind1).minus(v1).kNorm(2) - - orderedNodes.get(ind2).minus(v2).kNorm(2)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java new file mode 100644 index 0000000..06066c2 --- /dev/null +++ b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.clustering; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.ml.math.DistanceMeasure; +import org.apache.ignite.ml.math.EuclideanDistance; +import org.apache.ignite.ml.math.StorageConstants; +import org.apache.ignite.ml.math.Vector; +import org.apache.ignite.ml.math.VectorUtils; +import org.apache.ignite.ml.math.functions.Functions; +import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; +import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood; + +/** + * This test is made to make sure that K-Means distributed clustering does not crush on distributed environment. + * In {@link KMeansDistributedClustererTestMultiNode} we check logic of clustering (checks for clusters structures). + * In this class we just check that clusterer does not crush. There are two separate tests because we cannot + * guarantee order in which nodes return results of intermediate computations and therefore algorithm can return + * different results. + */ +public class KMeansDistributedClustererTestMultiNode extends GridCommonAbstractTest { + /** Number of nodes in grid. */ + private static final int NODE_COUNT = 3; + + /** Grid instance. */ + private Ignite ignite; + + /** + * Default constructor. + */ + public KMeansDistributedClustererTestMultiNode() { + super(false); + } + + /** + * {@inheritDoc} + */ + @Override protected void beforeTest() throws Exception { + ignite = grid(NODE_COUNT); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 1; i <= NODE_COUNT; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** */ + @Test + public void testPerformClusterAnalysisDegenerate() { + IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); + + KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L); + + double[] v1 = new double[] {1959, 325100}; + double[] v2 = new double[] {1960, 373200}; + + SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, StorageConstants.ROW_STORAGE_MODE, + StorageConstants.RANDOM_ACCESS_MODE); + + points.setRow(0, v1); + points.setRow(1, v2); + + clusterer.cluster(points, 1); + } + + /** */ + @Test + public void testClusterizationOnDatasetWithObviousStructure() throws IOException { + IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); + + int ptsCnt = 10000; + int squareSideLen = 10000; + + Random rnd = new Random(123456L); + + // Let centers be in the vertices of square. + Map<Integer, Vector> centers = new HashMap<>(); + centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0})); + centers.put(900, new DenseLocalOnHeapVector(new double[] {squareSideLen, 0.0})); + centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, squareSideLen})); + centers.put(6000, new DenseLocalOnHeapVector(new double[] {squareSideLen, squareSideLen})); + + SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 2, StorageConstants.ROW_STORAGE_MODE, + StorageConstants.RANDOM_ACCESS_MODE); + + List<Integer> permutation = IntStream.range(0, ptsCnt).boxed().collect(Collectors.toList()); + Collections.shuffle(permutation, rnd); + + int totalCnt = 0; + + for (Integer count : centers.keySet()) { + for (int i = 0; i < count; i++) { + DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new DenseLocalOnHeapVector(2).assign(centers.get(count)); + // Perturbate point on random value. + pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100); + points.assignRow(permutation.get(totalCnt), pnt); + totalCnt++; + } + } + + EuclideanDistance dist = new EuclideanDistance(); + + KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(dist, 3, 100, 1L); + + clusterer.cluster(points, 4); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java ---------------------------------------------------------------------- diff --git a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java new file mode 100644 index 0000000..27aaa0c --- /dev/null +++ b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ml.clustering; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.ml.math.DistanceMeasure; +import org.apache.ignite.ml.math.EuclideanDistance; +import org.apache.ignite.ml.math.StorageConstants; +import org.apache.ignite.ml.math.Vector; +import org.apache.ignite.ml.math.VectorUtils; +import org.apache.ignite.ml.math.functions.Functions; +import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix; +import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood; + +/** */ +public class KMeansDistributedClustererTestSingleNode extends GridCommonAbstractTest { + /** + * Number of nodes in grid. We should use 1 in this test because otherwise algorithm will be unstable + * (We cannot guarantee the order in which results are returned from each node). + */ + private static final int NODE_COUNT = 1; + + /** Grid instance. */ + private Ignite ignite; + + /** + * Default constructor. + */ + public KMeansDistributedClustererTestSingleNode() { + super(false); + } + + /** + * {@inheritDoc} + */ + @Override protected void beforeTest() throws Exception { + ignite = grid(NODE_COUNT); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 1; i <= NODE_COUNT; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** */ + @Test + public void testPerformClusterAnalysisDegenerate() { + IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); + + KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L); + + double[] v1 = new double[] {1959, 325100}; + double[] v2 = new double[] {1960, 373200}; + + SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, StorageConstants.ROW_STORAGE_MODE, + StorageConstants.RANDOM_ACCESS_MODE); + + points.setRow(0, v1); + points.setRow(1, v2); + + KMeansModel mdl = clusterer.cluster(points, 1); + + Assert.assertEquals(1, mdl.centers().length); + Assert.assertEquals(2, mdl.centers()[0].size()); + } + + /** */ + @Test + public void testClusterizationOnDatasetWithObviousStructure() throws IOException { + IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName()); + + int ptsCnt = 10000; + int squareSideLen = 10000; + + Random rnd = new Random(123456L); + + // Let centers be in the vertices of square. + Map<Integer, Vector> centers = new HashMap<>(); + centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0})); + centers.put(900, new DenseLocalOnHeapVector(new double[] {squareSideLen, 0.0})); + centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, squareSideLen})); + centers.put(6000, new DenseLocalOnHeapVector(new double[] {squareSideLen, squareSideLen})); + + int centersCnt = centers.size(); + + SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 2, StorageConstants.ROW_STORAGE_MODE, + StorageConstants.RANDOM_ACCESS_MODE); + + List<Integer> permutation = IntStream.range(0, ptsCnt).boxed().collect(Collectors.toList()); + Collections.shuffle(permutation, rnd); + + Vector[] mc = new Vector[centersCnt]; + Arrays.fill(mc, VectorUtils.zeroes(2)); + + int centIdx = 0; + int totalCnt = 0; + + List<Vector> massCenters = new ArrayList<>(); + + for (Integer count : centers.keySet()) { + for (int i = 0; i < count; i++) { + DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new DenseLocalOnHeapVector(2).assign(centers.get(count)); + // Perturbate point on random value. + pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100); + mc[centIdx] = mc[centIdx].plus(pnt); + points.assignRow(permutation.get(totalCnt), pnt); + totalCnt++; + } + massCenters.add(mc[centIdx].times(1 / (double)count)); + centIdx++; + } + + EuclideanDistance dist = new EuclideanDistance(); + OrderedNodesComparator comp = new OrderedNodesComparator(centers.values().toArray(new Vector[] {}), dist); + + massCenters.sort(comp); + KMeansDistributedClusterer clusterer = new KMeansDistributedClusterer(dist, 3, 100, 1L); + + KMeansModel mdl = clusterer.cluster(points, 4); + Vector[] resCenters = mdl.centers(); + Arrays.sort(resCenters, comp); + + checkIsInEpsilonNeighbourhood(resCenters, massCenters.toArray(new Vector[] {}), 30.0); + } + + /** */ + private static class OrderedNodesComparator implements Comparator<Vector> { + /** */ + private final DistanceMeasure measure; + + /** */ + List<Vector> orderedNodes; + + /** */ + public OrderedNodesComparator(Vector[] orderedNodes, DistanceMeasure measure) { + this.orderedNodes = Arrays.asList(orderedNodes); + this.measure = measure; + } + + /** */ + private int findClosestNodeIndex(Vector v) { + return Functions.argmin(orderedNodes, v1 -> measure.compute(v1, v)).get1(); + } + + /** */ + @Override public int compare(Vector v1, Vector v2) { + int ind1 = findClosestNodeIndex(v1); + int ind2 = findClosestNodeIndex(v2); + + int signum = (int)Math.signum(ind1 - ind2); + + if (signum != 0) + return signum; + + return (int)Math.signum(orderedNodes.get(ind1).minus(v1).kNorm(2) - + orderedNodes.get(ind2).minus(v2).kNorm(2)); + } + } +} \ No newline at end of file