[SYSTEMML-2196] Multi-threaded matrix blocking on RDD parallelize This patch improves the performance for CP to Spark data exchange by parallelizing the matrix blocking on RDD parallelize. For a scenario of 10 x 800MB matrix parallelization (incl blocking),this patch improved performance from 8.1s to 3.8s.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c6a8715e Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c6a8715e Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c6a8715e Branch: refs/heads/master Commit: c6a8715eebf2f55b83d14e4bdaaa02dbe2f2d5d7 Parents: 81e1715 Author: Matthias Boehm <[email protected]> Authored: Thu Mar 22 15:27:50 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 22 15:27:50 2018 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 63 +++++++++++--------- 1 file changed, 34 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/c6a8715e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 179c24a..fb268d4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -20,9 +20,12 @@ package org.apache.sysml.runtime.controlprogram.context; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -684,37 +687,17 @@ public class SparkExecutionContext extends ExecutionContext throws DMLRuntimeException { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; - LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<>(); + List<Tuple2<MatrixIndexes,MatrixBlock>> list = null; - if( src.getNumRows() <= brlen - && src.getNumColumns() <= bclen ) - { - list.addLast(new Tuple2<>(new MatrixIndexes(1,1), src)); + if( src.getNumRows() <= brlen && src.getNumColumns() <= bclen ) { + list = Arrays.asList(new Tuple2<>(new MatrixIndexes(1,1), src)); } - else - { - boolean sparse = src.isInSparseFormat(); - - //create and write subblocks of matrix - for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++) - for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++) - { - int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen; - int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen; - - MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse); - - int row_offset = blockRow*brlen; - int col_offset = blockCol*bclen; - - //copy submatrix to block - src.slice( row_offset, row_offset+maxRow-1, - col_offset, col_offset+maxCol-1, block ); - - //append block to sequence file - MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1); - list.addLast(new Tuple2<>(indexes, block)); - } + else { + MatrixCharacteristics mc = new MatrixCharacteristics( + src.getNumRows(), src.getNumColumns(), brlen, bclen, src.getNonZeros()); + list = LongStream.range(0, mc.getNumBlocks()).parallel() + .mapToObj(i -> createIndexedBlock(src, mc, i)) + .collect(Collectors.toList()); } JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list); @@ -725,6 +708,28 @@ public class SparkExecutionContext extends ExecutionContext return result; } + + private static Tuple2<MatrixIndexes,MatrixBlock> createIndexedBlock(MatrixBlock mb, MatrixCharacteristics mc, long ix) { + try { + //compute block indexes + long blockRow = ix / mc.getNumColBlocks(); + long blockCol = ix % mc.getNumColBlocks(); + //compute block sizes + int maxRow = UtilFunctions.computeBlockSize(mc.getRows(), blockRow+1, mc.getRowsPerBlock()); + int maxCol = UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getColsPerBlock()); + //copy sub-matrix to block + MatrixBlock block = new MatrixBlock(maxRow, maxCol, mb.isInSparseFormat()); + int row_offset = (int)blockRow*mc.getRowsPerBlock(); + int col_offset = (int)blockCol*mc.getColsPerBlock(); + block = mb.slice( row_offset, row_offset+maxRow-1, + col_offset, col_offset+maxCol-1, block ); + //create key-value pair + return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block); + } + catch(DMLRuntimeException ex) { + throw new RuntimeException(ex); + } + } public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) throws DMLRuntimeException
