IGNITE-6849: Fix of failing tests of K-Means distributed clustering.
This closes #3009


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8195ba51
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8195ba51
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8195ba51

Branch: refs/heads/ignite-zk
Commit: 8195ba512a7bff7b0e882ce01017724be8bbd8d7
Parents: a1b6a33
Author: Artem Malykh <amal...@gridgain.com>
Authored: Thu Nov 9 16:43:51 2017 +0300
Committer: Igor Sapego <isap...@gridgain.com>
Committed: Thu Nov 9 16:43:51 2017 +0300

----------------------------------------------------------------------
 .../impls/storage/matrix/MapWrapperStorage.java |  18 +-
 .../ml/math/impls/vector/MapWrapperVector.java  |   8 +
 .../ml/clustering/ClusteringTestSuite.java      |   3 +-
 .../KMeansDistributedClustererTest.java         | 197 -------------------
 ...KMeansDistributedClustererTestMultiNode.java | 146 ++++++++++++++
 ...MeansDistributedClustererTestSingleNode.java | 197 +++++++++++++++++++
 6 files changed, 367 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
index 381ad75..4648421 100644
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/storage/matrix/MapWrapperStorage.java
@@ -30,10 +30,10 @@ import org.apache.ignite.ml.math.VectorStorage;
  */
 public class MapWrapperStorage implements VectorStorage {
     /** Underlying map. */
-    Map<Integer, Double> data;
+    private Map<Integer, Double> data;
 
     /** Vector size. */
-    int size;
+    private int size;
 
     /**
      * Construct a wrapper around given map.
@@ -41,6 +41,8 @@ public class MapWrapperStorage implements VectorStorage {
      * @param map Map to wrap.
      */
     public MapWrapperStorage(Map<Integer, Double> map) {
+        data = map;
+
         Set<Integer> keys = map.keySet();
 
         GridArgumentCheck.notEmpty(keys, "map");
@@ -50,8 +52,14 @@ public class MapWrapperStorage implements VectorStorage {
 
         assert min >= 0;
 
-        data = map;
-        size = (max - min) + 1;
+        size =  (max - min) + 1;
+    }
+
+    /**
+     * No-op constructor for serialization.
+     */
+    public MapWrapperStorage() {
+        // No-op.
     }
 
     /** {@inheritDoc} */
@@ -75,12 +83,14 @@ public class MapWrapperStorage implements VectorStorage {
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(data);
+        out.writeInt(size);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
         data = (Map<Integer, Double>)in.readObject();
+        size = in.readInt();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
 
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
index 83b40c1..58309f6 100644
--- 
a/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
+++ 
b/modules/ml/src/main/java/org/apache/ignite/ml/math/impls/vector/MapWrapperVector.java
@@ -20,6 +20,7 @@ package org.apache.ignite.ml.math.impls.vector;
 import java.util.Map;
 import org.apache.ignite.ml.math.Matrix;
 import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.VectorStorage;
 import org.apache.ignite.ml.math.impls.storage.matrix.MapWrapperStorage;
 
 /**
@@ -35,6 +36,13 @@ public class MapWrapperVector extends AbstractVector {
         setStorage(new MapWrapperStorage(map));
     }
 
+    /**
+     * No-op constructor for serialization.
+     */
+    public MapWrapperVector() {
+        // No-op.
+    }
+
     /** {@inheritDoc} */
     @Override public Vector like(int crd) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
index c39eeef..b4cce5e 100644
--- 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
+++ 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/ClusteringTestSuite.java
@@ -25,7 +25,8 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    KMeansDistributedClustererTest.class,
+    KMeansDistributedClustererTestSingleNode.class,
+    KMeansDistributedClustererTestMultiNode.class,
     KMeansLocalClustererTest.class
 })
 public class ClusteringTestSuite {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java
 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java
deleted file mode 100644
index a59b7f9..0000000
--- 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.ml.clustering;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.ml.math.DistanceMeasure;
-import org.apache.ignite.ml.math.EuclideanDistance;
-import org.apache.ignite.ml.math.StorageConstants;
-import org.apache.ignite.ml.math.Vector;
-import org.apache.ignite.ml.math.VectorUtils;
-import org.apache.ignite.ml.math.functions.Functions;
-import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
-import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static 
org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood;
-
-/** */
-public class KMeansDistributedClustererTest extends GridCommonAbstractTest {
-    /**
-     * Number of nodes in grid. We should use 1 in this test because otherwise 
algorithm will be unstable
-     * (We cannot guarantee the order in which results are returned from each 
node).
-     */
-    private static final int NODE_COUNT = 1;
-
-    /** Grid instance. */
-    private Ignite ignite;
-
-    /**
-     * Default constructor.
-     */
-    public KMeansDistributedClustererTest() {
-        super(false);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override protected void beforeTest() throws Exception {
-        ignite = grid(NODE_COUNT);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        for (int i = 1; i <= NODE_COUNT; i++)
-            startGrid(i);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** */
-    @Test
-    public void testPerformClusterAnalysisDegenerate() {
-        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L);
-
-        double[] v1 = new double[] {1959, 325100};
-        double[] v2 = new double[] {1960, 373200};
-
-        SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, 
StorageConstants.ROW_STORAGE_MODE,
-            StorageConstants.RANDOM_ACCESS_MODE);
-
-        points.setRow(0, v1);
-        points.setRow(1, v2);
-
-        KMeansModel mdl = clusterer.cluster(points, 1);
-
-        Assert.assertEquals(1, mdl.centers().length);
-        Assert.assertEquals(2, mdl.centers()[0].size());
-    }
-
-    /** */
-    @Test
-    public void testClusterizationOnDatasetWithObviousStructure() throws 
IOException {
-        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
-
-        int ptsCnt = 10000;
-        int squareSideLen = 10000;
-
-        Random rnd = new Random(123456L);
-
-        // Let centers be in the vertices of square.
-        Map<Integer, Vector> centers = new HashMap<>();
-        centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0}));
-        centers.put(900, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, 0.0}));
-        centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, 
squareSideLen}));
-        centers.put(6000, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, squareSideLen}));
-
-        int centersCnt = centers.size();
-
-        SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 
2, StorageConstants.ROW_STORAGE_MODE,
-            StorageConstants.RANDOM_ACCESS_MODE);
-
-        List<Integer> permutation = IntStream.range(0, 
ptsCnt).boxed().collect(Collectors.toList());
-        Collections.shuffle(permutation, rnd);
-
-        Vector[] mc = new Vector[centersCnt];
-        Arrays.fill(mc, VectorUtils.zeroes(2));
-
-        int centIdx = 0;
-        int totalCnt = 0;
-
-        List<Vector> massCenters = new ArrayList<>();
-
-        for (Integer count : centers.keySet()) {
-            for (int i = 0; i < count; i++) {
-                DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new 
DenseLocalOnHeapVector(2).assign(centers.get(count));
-                // pertrubate point on random value.
-                pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100);
-                mc[centIdx] = mc[centIdx].plus(pnt);
-                points.assignRow(permutation.get(totalCnt), pnt);
-                totalCnt++;
-            }
-            massCenters.add(mc[centIdx].times(1 / (double)count));
-            centIdx++;
-        }
-
-        EuclideanDistance dist = new EuclideanDistance();
-        OrderedNodesComparator comp = new 
OrderedNodesComparator(centers.values().toArray(new Vector[] {}), dist);
-
-        massCenters.sort(comp);
-        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(dist, 3, 100, 1L);
-
-        KMeansModel mdl = clusterer.cluster(points, 4);
-        Vector[] resCenters = mdl.centers();
-        Arrays.sort(resCenters, comp);
-
-        checkIsInEpsilonNeighbourhood(resCenters, massCenters.toArray(new 
Vector[] {}), 30.0);
-    }
-
-    /** */
-    private static class OrderedNodesComparator implements Comparator<Vector> {
-        /** */
-        private final DistanceMeasure measure;
-
-        /** */
-        List<Vector> orderedNodes;
-
-        /** */
-        public OrderedNodesComparator(Vector[] orderedNodes, DistanceMeasure 
measure) {
-            this.orderedNodes = Arrays.asList(orderedNodes);
-            this.measure = measure;
-        }
-
-        /** */
-        private int findClosestNodeIndex(Vector v) {
-            return Functions.argmin(orderedNodes, v1 -> measure.compute(v1, 
v)).get1();
-        }
-
-        /** */
-        @Override public int compare(Vector v1, Vector v2) {
-            int ind1 = findClosestNodeIndex(v1);
-            int ind2 = findClosestNodeIndex(v2);
-
-            int signum = (int)Math.signum(ind1 - ind2);
-
-            if (signum != 0)
-                return signum;
-
-            return (int)Math.signum(orderedNodes.get(ind1).minus(v1).kNorm(2) -
-                orderedNodes.get(ind2).minus(v2).kNorm(2));
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
new file mode 100644
index 0000000..06066c2
--- /dev/null
+++ 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestMultiNode.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.math.DistanceMeasure;
+import org.apache.ignite.ml.math.EuclideanDistance;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.VectorUtils;
+import org.apache.ignite.ml.math.functions.Functions;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood;
+
+/**
+ * This test is made to make sure that K-Means distributed clustering does not 
crush on distributed environment.
+ * In {@link KMeansDistributedClustererTestMultiNode} we check logic of 
clustering (checks for clusters structures).
+ * In this class we just check that clusterer does not crush. There are two 
separate tests because we cannot
+ * guarantee order in which nodes return results of intermediate computations 
and therefore algorithm can return
+ * different results.
+ */
+public class KMeansDistributedClustererTestMultiNode extends 
GridCommonAbstractTest {
+    /** Number of nodes in grid. */
+    private static final int NODE_COUNT = 3;
+
+    /** Grid instance. */
+    private Ignite ignite;
+
+    /**
+     * Default constructor.
+     */
+    public KMeansDistributedClustererTestMultiNode() {
+        super(false);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void beforeTest() throws Exception {
+        ignite = grid(NODE_COUNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testPerformClusterAnalysisDegenerate() {
+        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L);
+
+        double[] v1 = new double[] {1959, 325100};
+        double[] v2 = new double[] {1960, 373200};
+
+        SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, 
StorageConstants.ROW_STORAGE_MODE,
+            StorageConstants.RANDOM_ACCESS_MODE);
+
+        points.setRow(0, v1);
+        points.setRow(1, v2);
+
+        clusterer.cluster(points, 1);
+    }
+
+    /** */
+    @Test
+    public void testClusterizationOnDatasetWithObviousStructure() throws 
IOException {
+        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        int ptsCnt = 10000;
+        int squareSideLen = 10000;
+
+        Random rnd = new Random(123456L);
+
+        // Let centers be in the vertices of square.
+        Map<Integer, Vector> centers = new HashMap<>();
+        centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0}));
+        centers.put(900, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, 0.0}));
+        centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, 
squareSideLen}));
+        centers.put(6000, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, squareSideLen}));
+
+        SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 
2, StorageConstants.ROW_STORAGE_MODE,
+        StorageConstants.RANDOM_ACCESS_MODE);
+
+        List<Integer> permutation = IntStream.range(0, 
ptsCnt).boxed().collect(Collectors.toList());
+        Collections.shuffle(permutation, rnd);
+
+        int totalCnt = 0;
+
+        for (Integer count : centers.keySet()) {
+            for (int i = 0; i < count; i++) {
+                DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new 
DenseLocalOnHeapVector(2).assign(centers.get(count));
+                // Perturbate point on random value.
+                pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100);
+                points.assignRow(permutation.get(totalCnt), pnt);
+                totalCnt++;
+            }
+        }
+
+        EuclideanDistance dist = new EuclideanDistance();
+
+        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(dist, 3, 100, 1L);
+
+        clusterer.cluster(points, 4);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8195ba51/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java
----------------------------------------------------------------------
diff --git 
a/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java
 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java
new file mode 100644
index 0000000..27aaa0c
--- /dev/null
+++ 
b/modules/ml/src/test/java/org/apache/ignite/ml/clustering/KMeansDistributedClustererTestSingleNode.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ml.clustering;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.ml.math.DistanceMeasure;
+import org.apache.ignite.ml.math.EuclideanDistance;
+import org.apache.ignite.ml.math.StorageConstants;
+import org.apache.ignite.ml.math.Vector;
+import org.apache.ignite.ml.math.VectorUtils;
+import org.apache.ignite.ml.math.functions.Functions;
+import org.apache.ignite.ml.math.impls.matrix.SparseDistributedMatrix;
+import org.apache.ignite.ml.math.impls.vector.DenseLocalOnHeapVector;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.ignite.ml.clustering.KMeansUtil.checkIsInEpsilonNeighbourhood;
+
+/** */
+public class KMeansDistributedClustererTestSingleNode extends 
GridCommonAbstractTest {
+    /**
+     * Number of nodes in grid. We should use 1 in this test because otherwise 
algorithm will be unstable
+     * (We cannot guarantee the order in which results are returned from each 
node).
+     */
+    private static final int NODE_COUNT = 1;
+
+    /** Grid instance. */
+    private Ignite ignite;
+
+    /**
+     * Default constructor.
+     */
+    public KMeansDistributedClustererTestSingleNode() {
+        super(false);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void beforeTest() throws Exception {
+        ignite = grid(NODE_COUNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 1; i <= NODE_COUNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testPerformClusterAnalysisDegenerate() {
+        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(new EuclideanDistance(), 1, 1, 1L);
+
+        double[] v1 = new double[] {1959, 325100};
+        double[] v2 = new double[] {1960, 373200};
+
+        SparseDistributedMatrix points = new SparseDistributedMatrix(2, 2, 
StorageConstants.ROW_STORAGE_MODE,
+            StorageConstants.RANDOM_ACCESS_MODE);
+
+        points.setRow(0, v1);
+        points.setRow(1, v2);
+
+        KMeansModel mdl = clusterer.cluster(points, 1);
+
+        Assert.assertEquals(1, mdl.centers().length);
+        Assert.assertEquals(2, mdl.centers()[0].size());
+    }
+
+    /** */
+    @Test
+    public void testClusterizationOnDatasetWithObviousStructure() throws 
IOException {
+        
IgniteUtils.setCurrentIgniteName(ignite.configuration().getIgniteInstanceName());
+
+        int ptsCnt = 10000;
+        int squareSideLen = 10000;
+
+        Random rnd = new Random(123456L);
+
+        // Let centers be in the vertices of square.
+        Map<Integer, Vector> centers = new HashMap<>();
+        centers.put(100, new DenseLocalOnHeapVector(new double[] {0.0, 0.0}));
+        centers.put(900, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, 0.0}));
+        centers.put(3000, new DenseLocalOnHeapVector(new double[] {0.0, 
squareSideLen}));
+        centers.put(6000, new DenseLocalOnHeapVector(new double[] 
{squareSideLen, squareSideLen}));
+
+        int centersCnt = centers.size();
+
+        SparseDistributedMatrix points = new SparseDistributedMatrix(ptsCnt, 
2, StorageConstants.ROW_STORAGE_MODE,
+            StorageConstants.RANDOM_ACCESS_MODE);
+
+        List<Integer> permutation = IntStream.range(0, 
ptsCnt).boxed().collect(Collectors.toList());
+        Collections.shuffle(permutation, rnd);
+
+        Vector[] mc = new Vector[centersCnt];
+        Arrays.fill(mc, VectorUtils.zeroes(2));
+
+        int centIdx = 0;
+        int totalCnt = 0;
+
+        List<Vector> massCenters = new ArrayList<>();
+
+        for (Integer count : centers.keySet()) {
+            for (int i = 0; i < count; i++) {
+                DenseLocalOnHeapVector pnt = (DenseLocalOnHeapVector)new 
DenseLocalOnHeapVector(2).assign(centers.get(count));
+                // Perturbate point on random value.
+                pnt.map(val -> val + rnd.nextDouble() * squareSideLen / 100);
+                mc[centIdx] = mc[centIdx].plus(pnt);
+                points.assignRow(permutation.get(totalCnt), pnt);
+                totalCnt++;
+            }
+            massCenters.add(mc[centIdx].times(1 / (double)count));
+            centIdx++;
+        }
+
+        EuclideanDistance dist = new EuclideanDistance();
+        OrderedNodesComparator comp = new 
OrderedNodesComparator(centers.values().toArray(new Vector[] {}), dist);
+
+        massCenters.sort(comp);
+        KMeansDistributedClusterer clusterer = new 
KMeansDistributedClusterer(dist, 3, 100, 1L);
+
+        KMeansModel mdl = clusterer.cluster(points, 4);
+        Vector[] resCenters = mdl.centers();
+        Arrays.sort(resCenters, comp);
+
+        checkIsInEpsilonNeighbourhood(resCenters, massCenters.toArray(new 
Vector[] {}), 30.0);
+    }
+
+    /** */
+    private static class OrderedNodesComparator implements Comparator<Vector> {
+        /** */
+        private final DistanceMeasure measure;
+
+        /** */
+        List<Vector> orderedNodes;
+
+        /** */
+        public OrderedNodesComparator(Vector[] orderedNodes, DistanceMeasure 
measure) {
+            this.orderedNodes = Arrays.asList(orderedNodes);
+            this.measure = measure;
+        }
+
+        /** */
+        private int findClosestNodeIndex(Vector v) {
+            return Functions.argmin(orderedNodes, v1 -> measure.compute(v1, 
v)).get1();
+        }
+
+        /** */
+        @Override public int compare(Vector v1, Vector v2) {
+            int ind1 = findClosestNodeIndex(v1);
+            int ind2 = findClosestNodeIndex(v2);
+
+            int signum = (int)Math.signum(ind1 - ind2);
+
+            if (signum != 0)
+                return signum;
+
+            return (int)Math.signum(orderedNodes.get(ind1).minus(v1).kNorm(2) -
+                orderedNodes.get(ind2).minus(v2).kNorm(2));
+        }
+    }
+}
\ No newline at end of file

Reply via email to