Repository: incubator-systemml Updated Branches: refs/heads/master 066a8213e -> bbc77e71e
[MINOR] Code refactoring MatrixIndexingSPInstruction to enable parallel improvements in both indexing as well as prefetching Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bbc77e71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bbc77e71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bbc77e71 Branch: refs/heads/master Commit: bbc77e71eb9b5aa464f0130380bc30d3f42107b6 Parents: 066a821 Author: Niketan Pansare <npan...@us.ibm.com> Authored: Thu Feb 16 14:26:42 2017 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Thu Feb 16 14:26:42 2017 -0800 ---------------------------------------------------------------------- .../spark/MatrixIndexingSPInstruction.java | 139 +++++++++++-------- 1 file changed, 83 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bbc77e71/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java index 9d58718..71bb5ee 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java @@ -85,6 +85,83 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr); } + public static MatrixBlock inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, + MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException { + if( isSingleBlockLookup(mcIn, ixrange) ) { + return singleBlockIndexing(in1, mcIn, mcOut, ixrange); + } + else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) { + return multiBlockIndexing(in1, mcIn, mcOut, ixrange); + } + else + throw new DMLRuntimeException("Incorrect usage of inmemoryIndexing"); + } + + private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, + MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException { + //create list of all required matrix indexes + List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>(); + long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock()); + long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock()); + long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock()); + long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock()); + for( long r=rlix; r<=ruix; r++ ) + for( long c=clix; c<=cuix; c++ ) + filter.add( new MatrixIndexes(r,c) ); + + //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets + JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter); + out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks + .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks + + //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD + MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), + (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); + return mbout; + } + + private static MatrixBlock singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, + MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException { + //single block output via lookup (on partitioned inputs, this allows for single partition + //access to avoid a full scan of the input; note that this is especially important for + //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting. + long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock()); + long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock()); + List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix)); + if( list.size() != 1 ) + throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1)."); + + MatrixBlock tmp = list.get(0); + MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ? + tmp : tmp.sliceOperations( //reference full block or slice out sub-block + UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), + UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), + UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), + UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock()); + return mbout; + } + + public static JavaPairRDD<MatrixIndexes,MatrixBlock> generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, + MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange, SparkAggType aggType) { + JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; + + if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) { + out = in1.mapPartitionsToPair( + new SliceBlockPartitionFunction(ixrange, mcOut), true); + } + else if( aggType == SparkAggType.NONE + || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) { + out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) + .mapToPair(new SliceSingleBlock(ixrange, mcOut)); + } + else { + out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) + .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut)); + out = RDDAggregateUtils.mergeByKey(out); + } + return out; + } + @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException @@ -112,63 +189,13 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); if( isSingleBlockLookup(mcIn, ixrange) ) { - //single block output via lookup (on partitioned inputs, this allows for single partition - //access to avoid a full scan of the input; note that this is especially important for - //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting. - long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock()); - long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock()); - List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix)); - if( list.size() != 1 ) - throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1)."); - - MatrixBlock tmp = list.get(0); - MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ? - tmp : tmp.sliceOperations( //reference full block or slice out sub-block - UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), - UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), - UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), - UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock()); - - sec.setMatrixOutput(output.getName(), mbout); + sec.setMatrixOutput(output.getName(), singleBlockIndexing(in1, mcIn, mcOut, ixrange)); } else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) { - //create list of all required matrix indexes - List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>(); - long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock()); - long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock()); - long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock()); - long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock()); - for( long r=rlix; r<=ruix; r++ ) - for( long c=clix; c<=cuix; c++ ) - filter.add( new MatrixIndexes(r,c) ); - - //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets - JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter); - out = out.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) //filter unnecessary blocks - .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks - - //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD - MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), - (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); - sec.setMatrixOutput(output.getName(), mbout); + sec.setMatrixOutput(output.getName(), multiBlockIndexing(in1, mcIn, mcOut, ixrange)); } else { //rdd output for general case - JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; - - if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) { - out = in1.mapPartitionsToPair( - new SliceBlockPartitionFunction(ixrange, mcOut), true); - } - else if( _aggType == SparkAggType.NONE - || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) { - out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) - .mapToPair(new SliceSingleBlock(ixrange, mcOut)); - } - else { - out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) - .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut)); - out = RDDAggregateUtils.mergeByKey(out); - } + JavaPairRDD<MatrixIndexes,MatrixBlock> out = generalCaseRightIndexing(in1, mcIn, mcOut, ixrange, _aggType); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); @@ -252,7 +279,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction * @param ixrange index range * @return true if index range covers a single block of the input matrix */ - private static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) { + public static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) { return UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock()) == UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock()) && UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock()) @@ -271,7 +298,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction * @param ixrange index range * @return true if index range requires a multi-block lookup */ - private static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) { + public static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) { return SparkUtils.isHashPartitioned(in) //existing partitioner && OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn) //out-of-core dataset > SparkExecutionContext.getDataMemoryBudget(true, true) @@ -557,7 +584,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction * @param filter partition filter * @return matrix as {@code JavaPairRDD<MatrixIndexes,MatrixBlock>} */ - private JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD( + private static JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD( JavaPairRDD<MatrixIndexes,MatrixBlock> in, List<MatrixIndexes> filter ) { //build hashset of required partition ids