Repository: systemml Updated Branches: refs/heads/master 2f4bce92a -> 8def2040b
http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java deleted file mode 100644 index 7e0784e..0000000 --- a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/DataPartitionerTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.test.integration.functions.paramserv; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.IntStream; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDC; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDR; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDRR; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerOR; -import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; -import org.apache.sysml.runtime.util.DataConverter; -import org.junit.Assert; -import org.junit.Test; - -public class DataPartitionerTest { - - @Test - public void testDataPartitionerDC() { - DataPartitioner dp = new DataPartitionerDC(); - double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - MatrixObject features = ParamservUtils.newMatrixObject(); - features.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - features.refreshMetaData(); - features.release(); - MatrixObject labels = ParamservUtils.newMatrixObject(); - labels.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - labels.refreshMetaData(); - labels.release(); - DataPartitioner.Result result = dp.doPartitioning(3, features, labels); - - Assert.assertEquals(3, result.pFeatures.size()); - Assert.assertEquals(3, result.pLabels.size()); - - double[] expected1 = new double[] { 1, 2, 3, 4 }; - assertResult(result, 0, expected1); - - double[] expected2 = new double[] { 5, 6, 7, 8 }; - assertResult(result, 1, expected2); - - double[] expected3 = new double[] { 9, 10 }; - assertResult(result, 2, expected3); - } - - private void assertResult(DataPartitioner.Result result, int index, double[] expected) { - List<MatrixObject> pfs = result.pFeatures; - List<MatrixObject> pls = result.pLabels; - double[] realValue1 = pfs.get(index).acquireRead().getDenseBlockValues(); - double[] realValue2 = pls.get(index).acquireRead().getDenseBlockValues(); - Assert.assertArrayEquals(expected, realValue1, 0); - Assert.assertArrayEquals(expected, realValue2, 0); - } - - @Test - public void testDataPartitionerDR() { - DataPartitioner dp = new DataPartitionerDR(); - double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - MatrixObject features = ParamservUtils.newMatrixObject(); - features.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - features.refreshMetaData(); - features.release(); - MatrixObject labels = ParamservUtils.newMatrixObject(); - labels.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - labels.refreshMetaData(); - labels.release(); - - DataPartitioner.Result result = dp.doPartitioning(4, features, labels); - - Assert.assertEquals(4, result.pFeatures.size()); - Assert.assertEquals(4, result.pLabels.size()); - - // Ensure that the index is accorded between features and labels - IntStream.range(0, result.pFeatures.size()).forEach(i -> { - double[] f = result.pFeatures.get(i).acquireRead().getDenseBlockValues(); - double[] l = result.pLabels.get(i).acquireRead().getDenseBlockValues(); - Assert.assertArrayEquals(f, l, 0); - }); - - assertPermutationDR(df, result.pFeatures); - assertPermutationDR(df, result.pLabels); - } - - private void assertPermutationDR(double[] df, List<MatrixObject> list) { - Map<Double, Integer> dict = new HashMap<>(); - for (double d : df) { - dict.put(d, 0); - } - IntStream.range(0, list.size()).forEach(i -> { - double[] f = list.get(i).acquireRead().getDenseBlockValues(); - for (double d : f) { - dict.compute(d, (k, v) -> v + 1); - } - }); - - // check if all the occurence is equivalent to one - for (Map.Entry<Double, Integer> e : dict.entrySet()) { - Assert.assertEquals(1, (int) e.getValue()); - } - } - - @Test - public void testDataPartitionerDRR() { - DataPartitioner dp = new DataPartitionerDRR(); - double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - MatrixObject features = ParamservUtils.newMatrixObject(); - features.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - features.refreshMetaData(); - features.release(); - MatrixObject labels = ParamservUtils.newMatrixObject(); - labels.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - labels.refreshMetaData(); - labels.release(); - DataPartitioner.Result result = dp.doPartitioning(4, features, labels); - - Assert.assertEquals(4, result.pFeatures.size()); - Assert.assertEquals(4, result.pLabels.size()); - - double[] expected1 = new double[] { 4, 8 }; - assertResult(result, 0, expected1); - - double[] expected2 = new double[] { 1, 5, 9 }; - assertResult(result, 1, expected2); - - double[] expected3 = new double[] { 2, 6, 10 }; - assertResult(result, 2, expected3); - - double[] expected4 = new double[] { 3, 7 }; - assertResult(result, 3, expected4); - } - - @Test - public void testDataPartitionerOR() { - DataPartitioner dp = new DataPartitionerOR(); - double[] df = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - MatrixObject features = ParamservUtils.newMatrixObject(); - features.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - features.refreshMetaData(); - features.release(); - MatrixObject labels = ParamservUtils.newMatrixObject(); - labels.acquireModify(DataConverter.convertToMatrixBlock(df, true)); - labels.refreshMetaData(); - labels.release(); - - DataPartitioner.Result result = dp.doPartitioning(4, features, labels); - - Assert.assertEquals(4, result.pFeatures.size()); - Assert.assertEquals(4, result.pLabels.size()); - - assertPermutationOR(df, result.pFeatures); - assertPermutationOR(df, result.pLabels); - } - - private void assertPermutationOR(double[] df, List<MatrixObject> list) { - for (MatrixObject mo : list) { - Map<Double, Integer> dict = new HashMap<>(); - for (double d : df) { - dict.put(d, 0); - } - double[] f = mo.acquireRead().getDenseBlockValues(); - for (double d : f) { - dict.compute(d, (k, v) -> v + 1); - } - Assert.assertEquals(10, dict.size()); - // check if all the occurence is equivalent to one - for (Map.Entry<Double, Integer> e : dict.entrySet()) { - Assert.assertEquals(1, (int) e.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java new file mode 100644 index 0000000..1e4538a --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/LocalDataPartitionerTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.paramserv; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.junit.Assert; +import org.junit.Test; + +import scala.Tuple2; + +public class LocalDataPartitionerTest extends BaseDataPartitionerTest { + + @Test + public void testLocalDataPartitionerDC() { + DataPartitionScheme.Result result = launchLocalDataPartitionerDC(); + + Assert.assertEquals(WORKER_NUM, result.pFeatures.size()); + Assert.assertEquals(WORKER_NUM, result.pLabels.size()); + for (int i = 0; i < WORKER_NUM; i++) { + assertDCResult(result, i); + } + } + + private void assertDCResult(DataPartitionScheme.Result result, int workerID) { + Assert.assertArrayEquals(generateExpectedData(workerID * (ROW_SIZE / WORKER_NUM) * COL_SIZE, (workerID + 1) * (ROW_SIZE / WORKER_NUM) * COL_SIZE), result.pFeatures.get(workerID).acquireRead().getDenseBlockValues(), 0); + Assert.assertArrayEquals(generateExpectedData(workerID * (ROW_SIZE / WORKER_NUM), (workerID + 1) * (ROW_SIZE / WORKER_NUM)), result.pLabels.get(workerID).acquireRead().getDenseBlockValues(), 0); + } + + @Test + public void testLocalDataPartitionerDR() { + MatrixBlock[] mbs = generateData(); + DataPartitionScheme.Result result = launchLocalDataPartitionerDR(mbs); + + Assert.assertEquals(WORKER_NUM, result.pFeatures.size()); + Assert.assertEquals(WORKER_NUM, result.pLabels.size()); + + // Generate the expected data + MatrixBlock perm = ParamservUtils.generatePermutation(ROW_SIZE, ParamservUtils.SEED); + List<MatrixBlock> efs = generateExpectedDataDR(mbs[0], perm); + List<MatrixBlock> els = generateExpectedDataDR(mbs[1], perm); + + for (int i = 0; i < WORKER_NUM; i++) { + Assert.assertArrayEquals(efs.get(i).getDenseBlockValues(), result.pFeatures.get(i).acquireRead().getDenseBlockValues(), 0); + Assert.assertArrayEquals(els.get(i).getDenseBlockValues(), result.pLabels.get(i).acquireRead().getDenseBlockValues(), 0); + } + } + + private List<MatrixBlock> generateExpectedDataDR(MatrixBlock mb, MatrixBlock perm) { + int batchSize = (int) Math.ceil((double) ROW_SIZE / WORKER_NUM); + return IntStream.range(0, WORKER_NUM).mapToObj(i -> { + int begin = i * batchSize; + int end = Math.min((i + 1) * batchSize, mb.getNumRows()); + MatrixBlock slicedPerm = perm.slice(begin, end - 1); + return slicedPerm.aggregateBinaryOperations(slicedPerm, mb, new MatrixBlock(), + InstructionUtils.getMatMultOperator(WORKER_NUM)); + }).collect(Collectors.toList()); + } + + @Test + public void testLocalDataPartitionerDRR() { + DataPartitionScheme.Result result = launchLocalDataPartitionerDRR(); + + Assert.assertEquals(WORKER_NUM, result.pFeatures.size()); + Assert.assertEquals(WORKER_NUM, result.pLabels.size()); + for (int i = 0; i < WORKER_NUM; i++) { + assertDRRResult(result, i); + } + } + + private void assertDRRResult(DataPartitionScheme.Result result, int workerID) { + Tuple2<double[], double[]> expected = generateExpectedData(workerID, WORKER_NUM, ROW_SIZE / WORKER_NUM); + Assert.assertArrayEquals(expected._1, result.pFeatures.get(workerID).acquireRead().getDenseBlockValues(), 0); + Assert.assertArrayEquals(expected._2, result.pLabels.get(workerID).acquireRead().getDenseBlockValues(), 0); + } + + private Tuple2<double[], double[]> generateExpectedData(int start, int step, int rowSize) { + double[] features = new double[rowSize * COL_SIZE]; + int fIndex = 0; + double[] labels = new double[rowSize]; + for (int i = 0; i < rowSize; i++) { + int rowID = start + i * step; + labels[i] = rowID; + for (int j = rowID * COL_SIZE; j < (rowID + 1) * COL_SIZE; j++) { + features[fIndex++] = j; + } + } + return new Tuple2<>(features, labels); + } + + @Test + public void testLocalDataPartitionerOR() { + ParamservUtils.SEED = System.nanoTime(); + DataPartitionScheme.Result result = launchLocalDataPartitionerOR(); + + Assert.assertEquals(WORKER_NUM, result.pFeatures.size()); + Assert.assertEquals(WORKER_NUM, result.pLabels.size()); + for (int i = 0; i < WORKER_NUM; i++) { + Tuple2<MatrixBlock, MatrixBlock> expected = generateExpectedDataOR(i); + Assert.assertArrayEquals(expected._1.getDenseBlockValues(), result.pFeatures.get(i).acquireRead().getDenseBlockValues(), 0); + Assert.assertArrayEquals(expected._2.getDenseBlockValues(), result.pLabels.get(i).acquireRead().getDenseBlockValues(), 0); + } + } + + private Tuple2<MatrixBlock, MatrixBlock> generateExpectedDataOR(int workerID) { + MatrixBlock[] mbs = generateData(); + MatrixBlock perm = ParamservUtils.generatePermutation(ROW_SIZE, ParamservUtils.SEED+workerID); + return new Tuple2<>(perm.aggregateBinaryOperations(perm, mbs[0], new MatrixBlock(), InstructionUtils.getMatMultOperator(WORKER_NUM)), + perm.aggregateBinaryOperations(perm, mbs[1], new MatrixBlock(), InstructionUtils.getMatMultOperator(WORKER_NUM))); + } + +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java new file mode 100644 index 0000000..b0e4a27 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/SparkDataPartitionerTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.paramserv; + +import java.util.Map; +import java.util.stream.IntStream; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.junit.Assert; +import org.junit.Test; + +import scala.Tuple2; + +public class SparkDataPartitionerTest extends BaseDataPartitionerTest { + + private static SparkExecutionContext _sec; + + static { + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + DMLScript.rtplatform = DMLScript.RUNTIME_PLATFORM.SPARK; + _sec = (SparkExecutionContext) ExecutionContextFactory.createContext(null); + } + + private Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> doPartitioning(Statement.PSScheme scheme) { + MatrixBlock[] mbs = generateData(); + return ParamservUtils.doPartitionOnSpark(_sec, ParamservUtils.newMatrixObject(mbs[0]), ParamservUtils.newMatrixObject(mbs[1]), scheme, WORKER_NUM).collectAsMap(); + } + + @Test + public void testSparkDataPartitionerDC() { + DataPartitionScheme.Result localResult = launchLocalDataPartitionerDC(); + Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = doPartitioning(Statement.PSScheme.DISJOINT_CONTIGUOUS); + + // Compare the both + assertResult(localResult, sparkResult); + } + + private void assertResult(DataPartitionScheme.Result local, Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> spark) { + IntStream.range(0, WORKER_NUM).forEach(w -> { + Assert.assertArrayEquals(local.pFeatures.get(w).acquireRead().getDenseBlockValues(), spark.get(w)._1.getDenseBlockValues(), 0); + Assert.assertArrayEquals(local.pLabels.get(w).acquireRead().getDenseBlockValues(), spark.get(w)._2.getDenseBlockValues(), 0); + }); + } + + @Test + public void testSparkDataPartitionerDR() { + ParamservUtils.SEED = System.nanoTime(); + MatrixBlock[] mbs = generateData(); + DataPartitionScheme.Result localResult = launchLocalDataPartitionerDR(mbs); + Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = doPartitioning(Statement.PSScheme.DISJOINT_RANDOM); + + // Compare the both + assertResult(localResult, sparkResult); + } + + @Test + public void testSparkDataPartitionerDRR() { + DataPartitionScheme.Result localResult = launchLocalDataPartitionerDRR(); + Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = doPartitioning(Statement.PSScheme.DISJOINT_ROUND_ROBIN); + + // Compare the both + assertResult(localResult, sparkResult); + } + + @Test + public void testSparkDataPartitionerOR() { + ParamservUtils.SEED = System.nanoTime(); + DataPartitionScheme.Result localResult = launchLocalDataPartitionerOR(); + Map<Integer, Tuple2<MatrixBlock, MatrixBlock>> sparkResult = doPartitioning(Statement.PSScheme.OVERLAP_RESHUFFLE); + + // Compare the both + assertResult(localResult, sparkResult); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java index 89350bc..26ea638 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/paramserv/ZPackageSuite.java @@ -26,7 +26,8 @@ import org.junit.runners.Suite; * won't run two of them at once. */ @RunWith(Suite.class) @Suite.SuiteClasses({ - DataPartitionerTest.class, + LocalDataPartitionerTest.class, + SparkDataPartitionerTest.class, ParamservSyntaxTest.class, ParamservRecompilationTest.class, ParamservRuntimeNegativeTest.class,
