Repository: incubator-systemml
Updated Branches:
  refs/heads/master 066a8213e -> bbc77e71e


[MINOR] Code refactoring MatrixIndexingSPInstruction to enable parallel
improvements in both indexing as well as prefetching

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bbc77e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bbc77e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bbc77e71

Branch: refs/heads/master
Commit: bbc77e71eb9b5aa464f0130380bc30d3f42107b6
Parents: 066a821
Author: Niketan Pansare <npan...@us.ibm.com>
Authored: Thu Feb 16 14:26:42 2017 -0800
Committer: Niketan Pansare <npan...@us.ibm.com>
Committed: Thu Feb 16 14:26:42 2017 -0800

----------------------------------------------------------------------
 .../spark/MatrixIndexingSPInstruction.java      | 139 +++++++++++--------
 1 file changed, 83 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bbc77e71/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 9d58718..71bb5ee 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -85,6 +85,83 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, 
istr);
        }
        
+       public static MatrixBlock 
inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, IndexRange ixrange) throws DMLRuntimeException {
+               if( isSingleBlockLookup(mcIn, ixrange) ) {
+                       return singleBlockIndexing(in1, mcIn, mcOut, ixrange);
+               }
+               else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
+                       return multiBlockIndexing(in1, mcIn, mcOut, ixrange);
+               }
+               else
+                       throw new DMLRuntimeException("Incorrect usage of 
inmemoryIndexing");
+       }
+       
+       private static MatrixBlock 
multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, IndexRange ixrange) throws DMLRuntimeException {
+               //create list of all required matrix indexes
+               List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
+               long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, 
mcIn.getRowsPerBlock());
+               long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, 
mcIn.getRowsPerBlock());
+               long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, 
mcIn.getColsPerBlock());
+               long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, 
mcIn.getColsPerBlock());
+               for( long r=rlix; r<=ruix; r++ )
+                       for( long c=clix; c<=cuix; c++ )
+                               filter.add( new MatrixIndexes(r,c) );
+               
+               //wrap PartitionPruningRDD around input to exploit pruning for 
out-of-core datasets
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
createPartitionPruningRDD(in1, filter);
+               out = out.filter(new IsBlockInRange(ixrange.rowStart, 
ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary 
blocks 
+                                .mapToPair(new SliceBlock2(ixrange, mcOut));   
    //slice relevant blocks
+               
+               //collect output without shuffle to avoid side-effects with 
custom PartitionPruningRDD
+               MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, 
(int)mcOut.getRows(), 
+                               (int)mcOut.getCols(), mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), -1);
+               return mbout;
+       }
+       
+       private static MatrixBlock 
singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, IndexRange ixrange) throws DMLRuntimeException {
+               //single block output via lookup (on partitioned inputs, this 
allows for single partition
+               //access to avoid a full scan of the input; note that this is 
especially important for 
+               //out-of-core datasets as entire partitions are read, not just 
keys as in the in-memory setting.
+               long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, 
mcIn.getRowsPerBlock());
+               long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, 
mcIn.getColsPerBlock());
+               List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, 
cix));
+               if( list.size() != 1 )
+                       throw new DMLRuntimeException("Block lookup returned 
"+list.size()+" blocks (expected 1).");
+               
+               MatrixBlock tmp = list.get(0);
+               MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && 
tmp.getNumColumns()==mcOut.getCols()) ? 
+                               tmp : tmp.sliceOperations( //reference full 
block or slice out sub-block
+                               
UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), 
+                               
UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), 
+                               
UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), 
+                               
UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new 
MatrixBlock());
+               return mbout;
+       }
+       
+       public static JavaPairRDD<MatrixIndexes,MatrixBlock> 
generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut, IndexRange ixrange, SparkAggType aggType) {
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+               
+               if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
+                       out = in1.mapPartitionsToPair(
+                                       new 
SliceBlockPartitionFunction(ixrange, mcOut), true);
+               }
+               else if( aggType == SparkAggType.NONE
+                       || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, 
mcIn) ) {
+                       out = in1.filter(new IsBlockInRange(ixrange.rowStart, 
ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+                            .mapToPair(new SliceSingleBlock(ixrange, mcOut));
+               }
+               else {
+                       out = in1.filter(new IsBlockInRange(ixrange.rowStart, 
ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+                            .flatMapToPair(new SliceMultipleBlocks(ixrange, 
mcOut));
+                       out = RDDAggregateUtils.mergeByKey(out);
+               }
+               return out;
+       }
+       
        @Override
        public void processInstruction(ExecutionContext ec)
                        throws DMLRuntimeException 
@@ -112,63 +189,13 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
                        
                        if( isSingleBlockLookup(mcIn, ixrange) ) {
-                               //single block output via lookup (on 
partitioned inputs, this allows for single partition
-                               //access to avoid a full scan of the input; 
note that this is especially important for 
-                               //out-of-core datasets as entire partitions are 
read, not just keys as in the in-memory setting.
-                               long rix = 
UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
-                               long cix = 
UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
-                               List<MatrixBlock> list = in1.lookup(new 
MatrixIndexes(rix, cix));
-                               if( list.size() != 1 )
-                                       throw new DMLRuntimeException("Block 
lookup returned "+list.size()+" blocks (expected 1).");
-                               
-                               MatrixBlock tmp = list.get(0);
-                               MatrixBlock mbout = 
(tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ? 
-                                               tmp : tmp.sliceOperations( 
//reference full block or slice out sub-block
-                                               
UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), 
-                                               
UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), 
-                                               
UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), 
-                                               
UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new 
MatrixBlock());
-                               
-                               sec.setMatrixOutput(output.getName(), mbout);
+                               sec.setMatrixOutput(output.getName(), 
singleBlockIndexing(in1, mcIn, mcOut, ixrange));
                        }
                        else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) 
) {
-                               //create list of all required matrix indexes
-                               List<MatrixIndexes> filter = new 
ArrayList<MatrixIndexes>();
-                               long rlix = 
UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
-                               long ruix = 
UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
-                               long clix = 
UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
-                               long cuix = 
UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
-                               for( long r=rlix; r<=ruix; r++ )
-                                       for( long c=clix; c<=cuix; c++ )
-                                               filter.add( new 
MatrixIndexes(r,c) );
-                               
-                               //wrap PartitionPruningRDD around input to 
exploit pruning for out-of-core datasets
-                               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
createPartitionPruningRDD(in1, filter);
-                               out = out.filter(new IsBlockInRange(rl, ru, cl, 
cu, mcOut)) //filter unnecessary blocks 
-                                                .mapToPair(new 
SliceBlock2(ixrange, mcOut));       //slice relevant blocks
-                               
-                               //collect output without shuffle to avoid 
side-effects with custom PartitionPruningRDD
-                               MatrixBlock mbout = 
SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), 
-                                               (int)mcOut.getCols(), 
mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
-                               sec.setMatrixOutput(output.getName(), mbout);
+                               sec.setMatrixOutput(output.getName(), 
multiBlockIndexing(in1, mcIn, mcOut, ixrange));
                        }
                        else { //rdd output for general case
-                               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
null;
-                               
-                               if( isPartitioningPreservingRightIndexing(mcIn, 
ixrange) ) {
-                                       out = in1.mapPartitionsToPair(
-                                                       new 
SliceBlockPartitionFunction(ixrange, mcOut), true);
-                               }
-                               else if( _aggType == SparkAggType.NONE
-                                       || 
OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
-                                       out = in1.filter(new IsBlockInRange(rl, 
ru, cl, cu, mcOut))
-                                            .mapToPair(new 
SliceSingleBlock(ixrange, mcOut));
-                               }
-                               else {
-                                       out = in1.filter(new IsBlockInRange(rl, 
ru, cl, cu, mcOut))
-                                            .flatMapToPair(new 
SliceMultipleBlocks(ixrange, mcOut));
-                                       out = RDDAggregateUtils.mergeByKey(out);
-                               }
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
generalCaseRightIndexing(in1, mcIn, mcOut, ixrange, _aggType);
                                        
                                //put output RDD handle into symbol table
                                sec.setRDDHandleForVariable(output.getName(), 
out);
@@ -252,7 +279,7 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
         * @param ixrange index range
         * @return true if index range covers a single block of the input matrix
         */
-       private static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, 
IndexRange ixrange) {
+       public static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, 
IndexRange ixrange) {
                return UtilFunctions.computeBlockIndex(ixrange.rowStart, 
mcIn.getRowsPerBlock())
                        == UtilFunctions.computeBlockIndex(ixrange.rowEnd, 
mcIn.getRowsPerBlock())
                        && UtilFunctions.computeBlockIndex(ixrange.colStart, 
mcIn.getColsPerBlock())
@@ -271,7 +298,7 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
         * @param ixrange index range
         * @return true if index range requires a multi-block lookup
         */
-       private static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, 
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
+       public static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, 
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
                return SparkUtils.isHashPartitioned(in)                         
 //existing partitioner
                        && 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn) //out-of-core dataset
                           > SparkExecutionContext.getDataMemoryBudget(true, 
true)
@@ -557,7 +584,7 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
         * @param filter partition filter
         * @return matrix as {@code JavaPairRDD<MatrixIndexes,MatrixBlock>}
         */
-       private JavaPairRDD<MatrixIndexes,MatrixBlock> 
createPartitionPruningRDD( 
+       private static JavaPairRDD<MatrixIndexes,MatrixBlock> 
createPartitionPruningRDD( 
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in, 
List<MatrixIndexes> filter )
        {
                //build hashset of required partition ids

Reply via email to