Repository: incubator-systemml Updated Branches: refs/heads/master 80ab57bda -> 066a8213e
[SYSTEMML-1273] Performance spark right indexing (aligned, w/o agg) This patch adds a special case for spark right indexing without aggregation, where a single block is either directly passed through to the output or sliced to a single output block. The main application is aligned slicing of entire row batches. The change also includes specific tests for this special case. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f893f6a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f893f6a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f893f6a2 Branch: refs/heads/master Commit: f893f6a2deaa13e5f4e4a7e601e71abde3aea55c Parents: 80ab57b Author: Matthias Boehm <[email protected]> Authored: Wed Feb 15 22:57:50 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Thu Feb 16 12:13:45 2017 -0800 ---------------------------------------------------------------------- .../spark/MatrixIndexingSPInstruction.java | 74 ++++++++-- .../indexing/IndexRangeBlockAlignmentTest.java | 86 ++++++++++++ .../indexing/RowBatchRightIndexingTest.java | 136 +++++++++++++++++++ .../functions/indexing/RowBatchIndexingTest.R | 35 +++++ .../functions/indexing/RowBatchIndexingTest.dml | 32 +++++ .../functions/indexing/ZPackageSuite.java | 6 +- 6 files changed, 354 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java index 5b1012f..9d58718 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java @@ -152,21 +152,22 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); sec.setMatrixOutput(output.getName(), mbout); } - else { - //rdd output for general case + else { //rdd output for general case JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; + if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) { out = in1.mapPartitionsToPair( new SliceBlockPartitionFunction(ixrange, mcOut), true); } + else if( _aggType == SparkAggType.NONE + || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) { + out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) + .mapToPair(new SliceSingleBlock(ixrange, mcOut)); + } else { out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) - .flatMapToPair(new SliceBlock(ixrange, mcOut)); - - //aggregation if required - boolean aligned = OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn); - if( _aggType != SparkAggType.NONE && !aligned ) - out = RDDAggregateUtils.mergeByKey(out); + .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut)); + out = RDDAggregateUtils.mergeByKey(out); } //put output RDD handle into symbol table @@ -405,15 +406,62 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction } } - private static class SliceBlock implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> + private static class SliceSingleBlock implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> + { + private static final long serialVersionUID = -6724027136506200924L; + + private final IndexRange _ixrange; + private final int _brlen; + private final int _bclen; + + public SliceSingleBlock(IndexRange ixrange, MatrixCharacteristics mcOut) { + _ixrange = ixrange; + _brlen = mcOut.getRowsPerBlock(); + _bclen = mcOut.getColsPerBlock(); + } + + @Override + public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> kv) + throws Exception + { + //get inputs (guaranteed to fall into indexing range) + MatrixIndexes ix = kv._1(); + MatrixBlock block = kv._2(); + + //compute local index range + long grix = UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, 0); + long gcix = UtilFunctions.computeCellIndex(ix.getColumnIndex(), _bclen, 0); + int lrl = (int)((_ixrange.rowStart<grix) ? 0 : _ixrange.rowStart - grix); + int lcl = (int)((_ixrange.colStart<gcix) ? 0 : _ixrange.colStart - gcix); + int lru = (int)Math.min(block.getNumRows()-1, _ixrange.rowEnd - grix); + int lcu = (int)Math.min(block.getNumColumns()-1, _ixrange.colEnd - gcix); + + //compute output index + MatrixIndexes ixOut = new MatrixIndexes( + ix.getRowIndex() - (_ixrange.rowStart-1)/_brlen, + ix.getColumnIndex() - (_ixrange.colStart-1)/_bclen); + + //create return matrix block (via shallow copy or slice) + if( lrl == 0 && lru == block.getNumRows()-1 + && lcl == 0 && lcu == block.getNumColumns()-1 ) { + return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, block); + } + else { + return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, + block.sliceOperations(lrl, lru, lcl, lcu, new MatrixBlock())); + } + } + } + + private static class SliceMultipleBlocks implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = 5733886476413136826L; - private IndexRange _ixrange; - private int _brlen; - private int _bclen; + private final IndexRange _ixrange; + private final int _brlen; + private final int _bclen; - public SliceBlock(IndexRange ixrange, MatrixCharacteristics mcOut) { + public SliceMultipleBlocks(IndexRange ixrange, MatrixCharacteristics mcOut) { _ixrange = ixrange; _brlen = mcOut.getRowsPerBlock(); _bclen = mcOut.getColsPerBlock(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java b/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java new file mode 100644 index 0000000..b14ae99 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.indexing; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.test.integration.AutomatedTestBase; + + +public class IndexRangeBlockAlignmentTest extends AutomatedTestBase +{ + private static final int BRLEN = 1000; + private static final int BCLEN = 1000; + + @Override + public void setUp() { + + } + + @Test + public void testRowBlockFirstColumn() { + Assert.assertEquals(new Boolean(true), + OptimizerUtils.isIndexingRangeBlockAligned(2001, 4000, 1, 1736, BRLEN, BCLEN)); + } + + @Test + public void testRowBlockColBlock() { + Assert.assertEquals(new Boolean(true), + OptimizerUtils.isIndexingRangeBlockAligned(2001, 4000, 7001, 9000, BRLEN, BCLEN)); + } + + @Test + public void testSingleRowBlockFirstColumn() { + Assert.assertEquals(new Boolean(true), + OptimizerUtils.isIndexingRangeBlockAligned(2500, 2600, 1, 1736, BRLEN, BCLEN)); + } + + @Test + public void testSingleRowBlockColBlock() { + Assert.assertEquals(new Boolean(true), + OptimizerUtils.isIndexingRangeBlockAligned(2500, 2600, 7001, 9000, BRLEN, BCLEN)); + } + + @Test + public void testRowBlockFirstColumnNeg() { + Assert.assertEquals(new Boolean(false), + OptimizerUtils.isIndexingRangeBlockAligned(2501, 4500, 1, 1736, BRLEN, BCLEN)); + } + + @Test + public void testRowBlockColBlockNeg() { + Assert.assertEquals(new Boolean(false), + OptimizerUtils.isIndexingRangeBlockAligned(2501, 4500, 7001, 9000, BRLEN, BCLEN)); + } + + @Test + public void testSingleRowBlockFirstColumnNeg() { + Assert.assertEquals(new Boolean(false), + OptimizerUtils.isIndexingRangeBlockAligned(2500, 3001, 1, 1736, BRLEN, BCLEN)); + } + + @Test + public void testSingleRowBlockColBlockNeg() { + Assert.assertEquals(new Boolean(false), + OptimizerUtils.isIndexingRangeBlockAligned(2500, 3001, 7001, 9000, BRLEN, BCLEN)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java new file mode 100644 index 0000000..a9a967e --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.indexing; + +import java.util.HashMap; +import org.junit.Test; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class RowBatchRightIndexingTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "RowBatchIndexingTest"; + private final static String TEST_DIR = "functions/indexing/"; + private final static String TEST_CLASS_DIR = TEST_DIR + RowBatchRightIndexingTest.class.getSimpleName() + "/"; + + private final static double epsilon=0.0000000001; + private final static int rows = 1500; //multiple of 500 + private final static int cols = 1050; + + private final static double sparsity1 = 0.5; + private final static double sparsity2 = 0.01; + + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); + } + + @Test + public void testRightIndexingDenseCP() { + runRightIndexingTest(ExecType.CP, false); + } + + @Test + public void testRightIndexingDenseSP() { + runRightIndexingTest(ExecType.SPARK, false); + } + + @Test + public void testRightIndexingDenseMR() { + runRightIndexingTest(ExecType.MR, false); + } + + @Test + public void testRightIndexingSparseCP() { + runRightIndexingTest(ExecType.CP, true); + } + + @Test + public void testRightIndexingSparseSP() { + runRightIndexingTest(ExecType.SPARK, true); + } + + @Test + public void testRightIndexingSparseMR() { + runRightIndexingTest(ExecType.MR, true); + } + + /** + * + * @param et + * @param sparse + */ + public void runRightIndexingTest( ExecType et, boolean sparse ) + { + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try + { + TestConfiguration config = getTestConfiguration(TEST_NAME); + loadTestConfiguration(config); + + double sparsity = sparse ? sparsity2 : sparsity1; + + config.addVariable("rows", rows); + config.addVariable("cols", cols); + + String RI_HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = RI_HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-args", input("A"), output("B") }; + + fullRScriptName = RI_HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + + inputDir() + " " + expectedDir(); + + double[][] A = getRandomMatrix(rows, cols, 0, 1, sparsity, 23); + writeInputMatrixWithMTD("A", A, true); + + //run tests + runTest(true, false, null, -1); + runRScript(true); + + //compare output aggregate + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("B"); + HashMap<CellIndex, Double> rfile = readRMatrixFromFS("B"); + TestUtils.compareMatrices(dmlfile, rfile, epsilon, "DML", "R"); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/scripts/functions/indexing/RowBatchIndexingTest.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/indexing/RowBatchIndexingTest.R b/src/test/scripts/functions/indexing/RowBatchIndexingTest.R new file mode 100644 index 0000000..99334cf --- /dev/null +++ b/src/test/scripts/functions/indexing/RowBatchIndexingTest.R @@ -0,0 +1,35 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) +library("Matrix") + +A = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) +s = 0; +for( i in 1:(nrow(A)/500) ) { + Xi = A[((i-1)*500+1):(i*500),] + s = s + sum(Xi); +} + +B = as.matrix(s); +writeMM(as(B,"CsparseMatrix"), paste(args[2], "B", sep="")) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml b/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml new file mode 100644 index 0000000..164de84 --- /dev/null +++ b/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = read($1) +s = 0; +for( i in 1:nrow(A)/500 ) { + Xi = A[((i-1)*500+1):(i*500),] + if(1==1){} + s = s + sum(Xi); +} + +B = as.matrix(s); +write(B, $2) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java index 76e4832..ea67d1a 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java @@ -26,15 +26,17 @@ import org.junit.runners.Suite; * won't run two of them at once. */ @RunWith(Suite.class) @Suite.SuiteClasses({ + IndexRangeBlockAlignmentTest.class, + Jdk7IssueRightIndexingTest.class, LeftIndexingScalarTest.class, LeftIndexingSparseDenseTest.class, LeftIndexingSparseSparseTest.class, LeftIndexingTest.class, LeftIndexingUpdateInPlaceTest.class, + PyDMLImplicitSlicingBounds.class, RightIndexingMatrixTest.class, RightIndexingVectorTest.class, - - Jdk7IssueRightIndexingTest.class, + RowBatchRightIndexingTest.class, UnboundedScalarRightIndexingTest.class, })
