Repository: incubator-systemml
Updated Branches:
  refs/heads/master 80ab57bda -> 066a8213e


[SYSTEMML-1273] Performance spark right indexing (aligned, w/o agg)

This patch adds a special case for spark right indexing without
aggregation, where a single block is either directly passed through to
the output or sliced to a single output block. The main application is
aligned slicing of entire row batches. The change also includes specific
tests for this special case.
 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f893f6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f893f6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f893f6a2

Branch: refs/heads/master
Commit: f893f6a2deaa13e5f4e4a7e601e71abde3aea55c
Parents: 80ab57b
Author: Matthias Boehm <[email protected]>
Authored: Wed Feb 15 22:57:50 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Thu Feb 16 12:13:45 2017 -0800

----------------------------------------------------------------------
 .../spark/MatrixIndexingSPInstruction.java      |  74 ++++++++--
 .../indexing/IndexRangeBlockAlignmentTest.java  |  86 ++++++++++++
 .../indexing/RowBatchRightIndexingTest.java     | 136 +++++++++++++++++++
 .../functions/indexing/RowBatchIndexingTest.R   |  35 +++++
 .../functions/indexing/RowBatchIndexingTest.dml |  32 +++++
 .../functions/indexing/ZPackageSuite.java       |   6 +-
 6 files changed, 354 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 5b1012f..9d58718 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -152,21 +152,22 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                                                (int)mcOut.getCols(), 
mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
                                sec.setMatrixOutput(output.getName(), mbout);
                        }
-                       else {
-                               //rdd output for general case
+                       else { //rdd output for general case
                                JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
null;
+                               
                                if( isPartitioningPreservingRightIndexing(mcIn, 
ixrange) ) {
                                        out = in1.mapPartitionsToPair(
                                                        new 
SliceBlockPartitionFunction(ixrange, mcOut), true);
                                }
+                               else if( _aggType == SparkAggType.NONE
+                                       || 
OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
+                                       out = in1.filter(new IsBlockInRange(rl, 
ru, cl, cu, mcOut))
+                                            .mapToPair(new 
SliceSingleBlock(ixrange, mcOut));
+                               }
                                else {
                                        out = in1.filter(new IsBlockInRange(rl, 
ru, cl, cu, mcOut))
-                                            .flatMapToPair(new 
SliceBlock(ixrange, mcOut));
-                                       
-                                       //aggregation if required 
-                                       boolean aligned = 
OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn);
-                                       if( _aggType != SparkAggType.NONE && 
!aligned ) 
-                                               out = 
RDDAggregateUtils.mergeByKey(out);
+                                            .flatMapToPair(new 
SliceMultipleBlocks(ixrange, mcOut));
+                                       out = RDDAggregateUtils.mergeByKey(out);
                                }
                                        
                                //put output RDD handle into symbol table
@@ -405,15 +406,62 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                }
        }
 
-       private static class SliceBlock implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
+       private static class SliceSingleBlock implements 
PairFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
-6724027136506200924L;
+               
+               private final IndexRange _ixrange;
+               private final int _brlen; 
+               private final int _bclen;
+               
+               public SliceSingleBlock(IndexRange ixrange, 
MatrixCharacteristics mcOut) {
+                       _ixrange = ixrange;
+                       _brlen = mcOut.getRowsPerBlock();
+                       _bclen = mcOut.getColsPerBlock();
+               }
+
+               @Override
+               public Tuple2<MatrixIndexes, MatrixBlock> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+                       throws Exception 
+               {       
+                       //get inputs (guaranteed to fall into indexing range)
+                       MatrixIndexes ix = kv._1();
+                       MatrixBlock block = kv._2();
+                       
+                       //compute local index range 
+                       long grix = 
UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, 0);
+                       long gcix = 
UtilFunctions.computeCellIndex(ix.getColumnIndex(), _bclen, 0);
+                       int lrl = (int)((_ixrange.rowStart<grix) ? 0 : 
_ixrange.rowStart - grix);
+                       int lcl = (int)((_ixrange.colStart<gcix) ? 0 : 
_ixrange.colStart - gcix);
+                       int lru = (int)Math.min(block.getNumRows()-1, 
_ixrange.rowEnd - grix);
+                       int lcu = (int)Math.min(block.getNumColumns()-1, 
_ixrange.colEnd - gcix);
+                       
+                       //compute output index
+                       MatrixIndexes ixOut = new MatrixIndexes(
+                               ix.getRowIndex() - 
(_ixrange.rowStart-1)/_brlen, 
+                               ix.getColumnIndex() - 
(_ixrange.colStart-1)/_bclen);
+                       
+                       //create return matrix block (via shallow copy or slice)
+                       if( lrl == 0 && lru == block.getNumRows()-1
+                               && lcl == 0 && lcu == block.getNumColumns()-1 ) 
{
+                               return new Tuple2<MatrixIndexes, 
MatrixBlock>(ixOut, block);
+                       }
+                       else {
+                               return new Tuple2<MatrixIndexes, 
MatrixBlock>(ixOut, 
+                                       block.sliceOperations(lrl, lru, lcl, 
lcu, new MatrixBlock()));
+                       }
+               }               
+       }
+       
+       private static class SliceMultipleBlocks implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
        {
                private static final long serialVersionUID = 
5733886476413136826L;
                
-               private IndexRange _ixrange;
-               private int _brlen; 
-               private int _bclen;
+               private final IndexRange _ixrange;
+               private final int _brlen; 
+               private final int _bclen;
                
-               public SliceBlock(IndexRange ixrange, MatrixCharacteristics 
mcOut) {
+               public SliceMultipleBlocks(IndexRange ixrange, 
MatrixCharacteristics mcOut) {
                        _ixrange = ixrange;
                        _brlen = mcOut.getRowsPerBlock();
                        _bclen = mcOut.getColsPerBlock();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java
new file mode 100644
index 0000000..b14ae99
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/indexing/IndexRangeBlockAlignmentTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.indexing;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+
+
+public class IndexRangeBlockAlignmentTest extends AutomatedTestBase
+{
+       private static final int BRLEN = 1000;
+       private static final int BCLEN = 1000;
+       
+       @Override
+       public void setUp() {
+               
+       }
+       
+       @Test
+       public void testRowBlockFirstColumn() {
+               Assert.assertEquals(new Boolean(true), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2001, 4000, 
1, 1736, BRLEN, BCLEN));
+       }
+       
+       @Test
+       public void testRowBlockColBlock() {
+               Assert.assertEquals(new Boolean(true), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2001, 4000, 
7001, 9000, BRLEN, BCLEN));
+       }
+
+       @Test
+       public void testSingleRowBlockFirstColumn() {
+               Assert.assertEquals(new Boolean(true), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2500, 2600, 
1, 1736, BRLEN, BCLEN));
+       }
+       
+       @Test
+       public void testSingleRowBlockColBlock() {
+               Assert.assertEquals(new Boolean(true), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2500, 2600, 
7001, 9000, BRLEN, BCLEN));
+       }
+       
+       @Test
+       public void testRowBlockFirstColumnNeg() {
+               Assert.assertEquals(new Boolean(false), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2501, 4500, 
1, 1736, BRLEN, BCLEN));
+       }
+       
+       @Test
+       public void testRowBlockColBlockNeg() {
+               Assert.assertEquals(new Boolean(false), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2501, 4500, 
7001, 9000, BRLEN, BCLEN));
+       }
+
+       @Test
+       public void testSingleRowBlockFirstColumnNeg() {
+               Assert.assertEquals(new Boolean(false), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2500, 3001, 
1, 1736, BRLEN, BCLEN));
+       }
+       
+       @Test
+       public void testSingleRowBlockColBlockNeg() {
+               Assert.assertEquals(new Boolean(false), 
+                       OptimizerUtils.isIndexingRangeBlockAligned(2500, 3001, 
7001, 9000, BRLEN, BCLEN));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java
new file mode 100644
index 0000000..a9a967e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/indexing/RowBatchRightIndexingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.indexing;
+
+import java.util.HashMap;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class RowBatchRightIndexingTest extends AutomatedTestBase
+{      
+       private final static String TEST_NAME = "RowBatchIndexingTest";
+       private final static String TEST_DIR = "functions/indexing/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
RowBatchRightIndexingTest.class.getSimpleName() + "/";
+       
+       private final static double epsilon=0.0000000001;
+       private final static int rows = 1500; //multiple of 500
+       private final static int cols = 1050;
+       
+       private final static double sparsity1 = 0.5;
+       private final static double sparsity2 = 0.01;
+       
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+       }
+       
+       @Test
+       public void testRightIndexingDenseCP() {
+               runRightIndexingTest(ExecType.CP, false);
+       }
+       
+       @Test
+       public void testRightIndexingDenseSP() {
+               runRightIndexingTest(ExecType.SPARK, false);
+       }
+       
+       @Test
+       public void testRightIndexingDenseMR() {
+               runRightIndexingTest(ExecType.MR, false);
+       }
+       
+       @Test
+       public void testRightIndexingSparseCP() {
+               runRightIndexingTest(ExecType.CP, true);
+       }
+       
+       @Test
+       public void testRightIndexingSparseSP() {
+               runRightIndexingTest(ExecType.SPARK, true);
+       }
+       
+       @Test
+       public void testRightIndexingSparseMR() {
+               runRightIndexingTest(ExecType.MR, true);
+       }
+       
+       /**
+        * 
+        * @param et
+        * @param sparse
+        */
+       public void runRightIndexingTest( ExecType et, boolean sparse ) 
+       {
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+               }       
+               
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try
+               {
+                   TestConfiguration config = getTestConfiguration(TEST_NAME);
+                       loadTestConfiguration(config);
+
+                   double sparsity = sparse ? sparsity2 : sparsity1;
+                   
+               config.addVariable("rows", rows);
+               config.addVariable("cols", cols);
+               
+               String RI_HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-args",  input("A"), 
output("B") };
+                       
+                       fullRScriptName = RI_HOME + TEST_NAME + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
+                               inputDir() + " " + expectedDir();
+       
+                       double[][] A = getRandomMatrix(rows, cols, 0, 1, 
sparsity, 23);
+               writeInputMatrixWithMTD("A", A, true);
+               
+               //run tests
+               runTest(true, false, null, -1);
+                       runRScript(true);
+                       
+                       //compare output aggregate
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("B");
+                       HashMap<CellIndex, Double> rfile = 
readRMatrixFromFS("B");
+                       TestUtils.compareMatrices(dmlfile, rfile, epsilon, 
"DML", "R");
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/scripts/functions/indexing/RowBatchIndexingTest.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/indexing/RowBatchIndexingTest.R 
b/src/test/scripts/functions/indexing/RowBatchIndexingTest.R
new file mode 100644
index 0000000..99334cf
--- /dev/null
+++ b/src/test/scripts/functions/indexing/RowBatchIndexingTest.R
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+s = 0;
+for( i in 1:(nrow(A)/500) ) {
+  Xi = A[((i-1)*500+1):(i*500),]
+  s = s + sum(Xi);
+}
+
+B = as.matrix(s);
+writeMM(as(B,"CsparseMatrix"), paste(args[2], "B", sep=""))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml 
b/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml
new file mode 100644
index 0000000..164de84
--- /dev/null
+++ b/src/test/scripts/functions/indexing/RowBatchIndexingTest.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = read($1)
+s = 0;
+for( i in 1:nrow(A)/500 ) {
+  Xi = A[((i-1)*500+1):(i*500),]
+  if(1==1){}
+  s = s + sum(Xi);
+}
+
+B = as.matrix(s);
+write(B, $2)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f893f6a2/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java
index 76e4832..ea67d1a 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/indexing/ZPackageSuite.java
@@ -26,15 +26,17 @@ import org.junit.runners.Suite;
  *  won't run two of them at once. */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
+       IndexRangeBlockAlignmentTest.class,
+       Jdk7IssueRightIndexingTest.class,
        LeftIndexingScalarTest.class,
        LeftIndexingSparseDenseTest.class,
        LeftIndexingSparseSparseTest.class,
        LeftIndexingTest.class,
        LeftIndexingUpdateInPlaceTest.class,
+       PyDMLImplicitSlicingBounds.class,
        RightIndexingMatrixTest.class,
        RightIndexingVectorTest.class,
-       
-       Jdk7IssueRightIndexingTest.class,
+       RowBatchRightIndexingTest.class,
        UnboundedScalarRightIndexingTest.class,
 })
 

Reply via email to