Repository: systemml
Updated Branches:
  refs/heads/master 2f4bce92a -> 8def2040b


http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java
deleted file mode 100644
index 7e0784e..0000000
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.paramserv;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDC;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDR;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDRR;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerOR;
-import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class DataPartitionerTest {
-
-       @Test
-       public void testDataPartitionerDC() {
-               DataPartitioner dp = new DataPartitionerDC();
-               double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-               MatrixObject features = ParamservUtils.newMatrixObject();
-               features.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               features.refreshMetaData();
-               features.release();
-               MatrixObject labels = ParamservUtils.newMatrixObject();
-               labels.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               labels.refreshMetaData();
-               labels.release();
-               DataPartitioner.Result result = dp.doPartitioning(3, features, 
labels);
-
-               Assert.assertEquals(3, result.pFeatures.size());
-               Assert.assertEquals(3, result.pLabels.size());
-
-               double[] expected1 = new double[] { 1, 2, 3, 4 };
-               assertResult(result, 0, expected1);
-
-               double[] expected2 = new double[] { 5, 6, 7, 8 };
-               assertResult(result, 1, expected2);
-
-               double[] expected3 = new double[] { 9, 10 };
-               assertResult(result, 2, expected3);
-       }
-
-       private void assertResult(DataPartitioner.Result result, int index, 
double[] expected) {
-               List<MatrixObject> pfs = result.pFeatures;
-               List<MatrixObject> pls = result.pLabels;
-               double[] realValue1 = 
pfs.get(index).acquireRead().getDenseBlockValues();
-               double[] realValue2 = 
pls.get(index).acquireRead().getDenseBlockValues();
-               Assert.assertArrayEquals(expected, realValue1, 0);
-               Assert.assertArrayEquals(expected, realValue2, 0);
-       }
-
-       @Test
-       public void testDataPartitionerDR() {
-               DataPartitioner dp = new DataPartitionerDR();
-               double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-               MatrixObject features = ParamservUtils.newMatrixObject();
-               features.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               features.refreshMetaData();
-               features.release();
-               MatrixObject labels = ParamservUtils.newMatrixObject();
-               labels.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               labels.refreshMetaData();
-               labels.release();
-
-               DataPartitioner.Result result = dp.doPartitioning(4, features, 
labels);
-
-               Assert.assertEquals(4, result.pFeatures.size());
-               Assert.assertEquals(4, result.pLabels.size());
-
-               // Ensure that the index is accorded between features and labels
-               IntStream.range(0, result.pFeatures.size()).forEach(i -> {
-                       double[] f = 
result.pFeatures.get(i).acquireRead().getDenseBlockValues();
-                       double[] l = 
result.pLabels.get(i).acquireRead().getDenseBlockValues();
-                       Assert.assertArrayEquals(f, l, 0);
-               });
-
-               assertPermutationDR(df, result.pFeatures);
-               assertPermutationDR(df, result.pLabels);
-       }
-
-       private void assertPermutationDR(double[] df, List<MatrixObject> list) {
-               Map<Double, Integer> dict = new HashMap<>();
-               for (double d : df) {
-                       dict.put(d, 0);
-               }
-               IntStream.range(0, list.size()).forEach(i -> {
-                       double[] f = 
list.get(i).acquireRead().getDenseBlockValues();
-                       for (double d : f) {
-                               dict.compute(d, (k, v) -> v + 1);
-                       }
-               });
-
-               // check if all the occurence is equivalent to one
-               for (Map.Entry<Double, Integer> e : dict.entrySet()) {
-                       Assert.assertEquals(1, (int) e.getValue());
-               }
-       }
-
-       @Test
-       public void testDataPartitionerDRR() {
-               DataPartitioner dp = new DataPartitionerDRR();
-               double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-               MatrixObject features = ParamservUtils.newMatrixObject();
-               features.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               features.refreshMetaData();
-               features.release();
-               MatrixObject labels = ParamservUtils.newMatrixObject();
-               labels.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               labels.refreshMetaData();
-               labels.release();
-               DataPartitioner.Result result = dp.doPartitioning(4, features, 
labels);
-
-               Assert.assertEquals(4, result.pFeatures.size());
-               Assert.assertEquals(4, result.pLabels.size());
-
-               double[] expected1 = new double[] { 4, 8 };
-               assertResult(result, 0, expected1);
-
-               double[] expected2 = new double[] { 1, 5, 9 };
-               assertResult(result, 1, expected2);
-
-               double[] expected3 = new double[] { 2, 6, 10 };
-               assertResult(result, 2, expected3);
-
-               double[] expected4 = new double[] { 3, 7 };
-               assertResult(result, 3, expected4);
-       }
-
-       @Test
-       public void testDataPartitionerOR() {
-               DataPartitioner dp = new DataPartitionerOR();
-               double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-               MatrixObject features = ParamservUtils.newMatrixObject();
-               features.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               features.refreshMetaData();
-               features.release();
-               MatrixObject labels = ParamservUtils.newMatrixObject();
-               labels.acquireModify(DataConverter.convertToMatrixBlock(df, 
true));
-               labels.refreshMetaData();
-               labels.release();
-
-               DataPartitioner.Result result = dp.doPartitioning(4, features, 
labels);
-
-               Assert.assertEquals(4, result.pFeatures.size());
-               Assert.assertEquals(4, result.pLabels.size());
-
-               assertPermutationOR(df, result.pFeatures);
-               assertPermutationOR(df, result.pLabels);
-       }
-
-       private void assertPermutationOR(double[] df, List<MatrixObject> list) {
-               for (MatrixObject mo : list) {
-                       Map<Double, Integer> dict = new HashMap<>();
-                       for (double d : df) {
-                               dict.put(d, 0);
-                       }
-                       double[] f = mo.acquireRead().getDenseBlockValues();
-                       for (double d : f) {
-                               dict.compute(d, (k, v) -> v + 1);
-                       }
-                       Assert.assertEquals(10, dict.size());
-                       // check if all the occurence is equivalent to one
-                       for (Map.Entry<Double, Integer> e : dict.entrySet()) {
-                               Assert.assertEquals(1, (int) e.getValue());
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java
new file mode 100644
index 0000000..1e4538a
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import scala.Tuple2;
+
+public class LocalDataPartitionerTest extends BaseDataPartitionerTest {
+
+       @Test
+       public void testLocalDataPartitionerDC() {
+               DataPartitionScheme.Result result = 
launchLocalDataPartitionerDC();
+
+               Assert.assertEquals(WORKER_NUM, result.pFeatures.size());
+               Assert.assertEquals(WORKER_NUM, result.pLabels.size());
+               for (int i = 0; i < WORKER_NUM; i++) {
+                       assertDCResult(result, i);
+               }
+       }
+
+       private void assertDCResult(DataPartitionScheme.Result result, int 
workerID) {
+               Assert.assertArrayEquals(generateExpectedData(workerID * 
(ROW_SIZE / WORKER_NUM) * COL_SIZE, (workerID + 1) * (ROW_SIZE / WORKER_NUM) * 
COL_SIZE), result.pFeatures.get(workerID).acquireRead().getDenseBlockValues(), 
0);
+               Assert.assertArrayEquals(generateExpectedData(workerID * 
(ROW_SIZE / WORKER_NUM), (workerID + 1) * (ROW_SIZE / WORKER_NUM)), 
result.pLabels.get(workerID).acquireRead().getDenseBlockValues(), 0);
+       }
+
+       @Test
+       public void testLocalDataPartitionerDR() {
+               MatrixBlock[] mbs = generateData();
+               DataPartitionScheme.Result result = 
launchLocalDataPartitionerDR(mbs);
+
+               Assert.assertEquals(WORKER_NUM, result.pFeatures.size());
+               Assert.assertEquals(WORKER_NUM, result.pLabels.size());
+
+               // Generate the expected data
+               MatrixBlock perm = ParamservUtils.generatePermutation(ROW_SIZE, 
ParamservUtils.SEED);
+               List<MatrixBlock> efs = generateExpectedDataDR(mbs[0], perm);
+               List<MatrixBlock> els = generateExpectedDataDR(mbs[1], perm);
+
+               for (int i = 0; i < WORKER_NUM; i++) {
+                       
Assert.assertArrayEquals(efs.get(i).getDenseBlockValues(), 
result.pFeatures.get(i).acquireRead().getDenseBlockValues(), 0);
+                       
Assert.assertArrayEquals(els.get(i).getDenseBlockValues(), 
result.pLabels.get(i).acquireRead().getDenseBlockValues(), 0);
+               }
+       }
+
+       private List<MatrixBlock> generateExpectedDataDR(MatrixBlock mb, 
MatrixBlock perm) {
+               int batchSize = (int) Math.ceil((double) ROW_SIZE / WORKER_NUM);
+               return IntStream.range(0, WORKER_NUM).mapToObj(i -> {
+                       int begin = i * batchSize;
+                       int end = Math.min((i + 1) * batchSize, 
mb.getNumRows());
+                       MatrixBlock slicedPerm = perm.slice(begin, end - 1);
+                       return slicedPerm.aggregateBinaryOperations(slicedPerm, 
mb, new MatrixBlock(),
+                                       
InstructionUtils.getMatMultOperator(WORKER_NUM));
+               }).collect(Collectors.toList());
+       }
+
+       @Test
+       public void testLocalDataPartitionerDRR() {
+               DataPartitionScheme.Result result = 
launchLocalDataPartitionerDRR();
+
+               Assert.assertEquals(WORKER_NUM, result.pFeatures.size());
+               Assert.assertEquals(WORKER_NUM, result.pLabels.size());
+               for (int i = 0; i < WORKER_NUM; i++) {
+                       assertDRRResult(result, i);
+               }
+       }
+
+       private void assertDRRResult(DataPartitionScheme.Result result, int 
workerID) {
+               Tuple2<double[], double[]> expected = 
generateExpectedData(workerID, WORKER_NUM, ROW_SIZE / WORKER_NUM);
+               Assert.assertArrayEquals(expected._1, 
result.pFeatures.get(workerID).acquireRead().getDenseBlockValues(), 0);
+               Assert.assertArrayEquals(expected._2, 
result.pLabels.get(workerID).acquireRead().getDenseBlockValues(), 0);
+       }
+
+       private Tuple2<double[], double[]> generateExpectedData(int start, int 
step, int rowSize) {
+               double[] features = new double[rowSize * COL_SIZE];
+               int fIndex = 0;
+               double[] labels = new double[rowSize];
+               for (int i = 0; i < rowSize; i++) {
+                       int rowID = start + i * step;
+                       labels[i] = rowID;
+                       for (int j = rowID * COL_SIZE; j < (rowID + 1) * 
COL_SIZE; j++) {
+                               features[fIndex++] = j;
+                       }
+               }
+               return new Tuple2<>(features, labels);
+       }
+
+       @Test
+       public void testLocalDataPartitionerOR() {
+               ParamservUtils.SEED = System.nanoTime();
+               DataPartitionScheme.Result result = 
launchLocalDataPartitionerOR();
+
+               Assert.assertEquals(WORKER_NUM, result.pFeatures.size());
+               Assert.assertEquals(WORKER_NUM, result.pLabels.size());
+               for (int i = 0; i < WORKER_NUM; i++) {
+                       Tuple2<MatrixBlock, MatrixBlock> expected = 
generateExpectedDataOR(i);
+                       
Assert.assertArrayEquals(expected._1.getDenseBlockValues(), 
result.pFeatures.get(i).acquireRead().getDenseBlockValues(), 0);
+                       
Assert.assertArrayEquals(expected._2.getDenseBlockValues(), 
result.pLabels.get(i).acquireRead().getDenseBlockValues(), 0);
+               }
+       }
+
+       private Tuple2<MatrixBlock, MatrixBlock> generateExpectedDataOR(int 
workerID) {
+               MatrixBlock[] mbs = generateData();
+               MatrixBlock perm = ParamservUtils.generatePermutation(ROW_SIZE, 
ParamservUtils.SEED+workerID);
+               return new Tuple2<>(perm.aggregateBinaryOperations(perm, 
mbs[0], new MatrixBlock(), InstructionUtils.getMatMultOperator(WORKER_NUM)),
+                       perm.aggregateBinaryOperations(perm, mbs[1], new 
MatrixBlock(), InstructionUtils.getMatMultOperator(WORKER_NUM)));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java
new file mode 100644
index 0000000..b0e4a27
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import scala.Tuple2;
+
+public class SparkDataPartitionerTest extends BaseDataPartitionerTest {
+
+       private static SparkExecutionContext _sec;
+
+       static {
+               DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               DMLScript.rtplatform = DMLScript.RUNTIME_PLATFORM.SPARK;
+               _sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext(null);
+       }
+
+       private Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> 
doPartitioning(Statement.PSScheme scheme) {
+               MatrixBlock[] mbs = generateData();
+               return ParamservUtils.doPartitionOnSpark(_sec, 
ParamservUtils.newMatrixObject(mbs[0]), ParamservUtils.newMatrixObject(mbs[1]), 
scheme, WORKER_NUM).collectAsMap();
+       }
+
+       @Test
+       public void testSparkDataPartitionerDC() {
+               DataPartitionScheme.Result localResult = 
launchLocalDataPartitionerDC();
+               Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = 
doPartitioning(Statement.PSScheme.DISJOINT_CONTIGUOUS);
+
+               // Compare the both
+               assertResult(localResult, sparkResult);
+       }
+
+       private void assertResult(DataPartitionScheme.Result local, 
Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> spark) {
+               IntStream.range(0, WORKER_NUM).forEach(w -> {
+                       
Assert.assertArrayEquals(local.pFeatures.get(w).acquireRead().getDenseBlockValues(),
 spark.get(w)._1.getDenseBlockValues(), 0);
+                       
Assert.assertArrayEquals(local.pLabels.get(w).acquireRead().getDenseBlockValues(),
 spark.get(w)._2.getDenseBlockValues(), 0);
+               });
+       }
+
+       @Test
+       public void testSparkDataPartitionerDR() {
+               ParamservUtils.SEED = System.nanoTime();
+               MatrixBlock[] mbs = generateData();
+               DataPartitionScheme.Result localResult = 
launchLocalDataPartitionerDR(mbs);
+               Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = 
doPartitioning(Statement.PSScheme.DISJOINT_RANDOM);
+
+               // Compare the both
+               assertResult(localResult, sparkResult);
+       }
+
+       @Test
+       public void testSparkDataPartitionerDRR() {
+               DataPartitionScheme.Result localResult = 
launchLocalDataPartitionerDRR();
+               Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = 
doPartitioning(Statement.PSScheme.DISJOINT_ROUND_ROBIN);
+
+               // Compare the both
+               assertResult(localResult, sparkResult);
+       }
+
+       @Test
+       public void testSparkDataPartitionerOR() {
+               ParamservUtils.SEED = System.nanoTime();
+               DataPartitionScheme.Result localResult = 
launchLocalDataPartitionerOR();
+               Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = 
doPartitioning(Statement.PSScheme.OVERLAP_RESHUFFLE);
+
+               // Compare the both
+               assertResult(localResult, sparkResult);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
index 89350bc..26ea638 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java
@@ -26,7 +26,8 @@ import org.junit.runners.Suite;
  *  won't run two of them at once. */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-       DataPartitionerTest.class,
+       LocalDataPartitionerTest.class,
+       SparkDataPartitionerTest.class,
        ParamservSyntaxTest.class,
        ParamservRecompilationTest.class,
        ParamservRuntimeNegativeTest.class,

Reply via email to