[SYSTEMML-2196] Multi-threaded matrix blocking on RDD parallelize

This patch improves the performance for CP to Spark data exchange by
parallelizing the matrix blocking on RDD parallelize. For a scenario of
10 x 800MB matrix parallelization (incl blocking),this patch improved
performance from 8.1s to 3.8s.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c6a8715e
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c6a8715e
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c6a8715e

Branch: refs/heads/master
Commit: c6a8715eebf2f55b83d14e4bdaaa02dbe2f2d5d7
Parents: 81e1715
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 22 15:27:50 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 22 15:27:50 2018 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 63 +++++++++++---------
 1 file changed, 34 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/c6a8715e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 179c24a..fb268d4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -20,9 +20,12 @@
 package org.apache.sysml.runtime.controlprogram.context;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -684,37 +687,17 @@ public class SparkExecutionContext extends 
ExecutionContext
                throws DMLRuntimeException
        {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
LinkedList<>();
+               List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
 
-               if(    src.getNumRows() <= brlen
-                   && src.getNumColumns() <= bclen )
-               {
-                       list.addLast(new Tuple2<>(new MatrixIndexes(1,1), src));
+               if( src.getNumRows() <= brlen && src.getNumColumns() <= bclen ) 
{
+                       list = Arrays.asList(new Tuple2<>(new 
MatrixIndexes(1,1), src));
                }
-               else
-               {
-                       boolean sparse = src.isInSparseFormat();
-
-                       //create and write subblocks of matrix
-                       for(int blockRow = 0; blockRow < 
(int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
-                               for(int blockCol = 0; blockCol < 
(int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
-                               {
-                                       int maxRow = (blockRow*brlen + brlen < 
src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
-                                       int maxCol = (blockCol*bclen + bclen < 
src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-
-                                       MatrixBlock block = new 
MatrixBlock(maxRow, maxCol, sparse);
-
-                                       int row_offset = blockRow*brlen;
-                                       int col_offset = blockCol*bclen;
-
-                                       //copy submatrix to block
-                                       src.slice( row_offset, 
row_offset+maxRow-1,
-                                                                    
col_offset, col_offset+maxCol-1, block );
-
-                                       //append block to sequence file
-                                       MatrixIndexes indexes = new 
MatrixIndexes(blockRow+1, blockCol+1);
-                                       list.addLast(new Tuple2<>(indexes, 
block));
-                               }
+               else {
+                       MatrixCharacteristics mc = new MatrixCharacteristics(
+                               src.getNumRows(), src.getNumColumns(), brlen, 
bclen, src.getNonZeros());
+                       list = LongStream.range(0, mc.getNumBlocks()).parallel()
+                               .mapToObj(i -> createIndexedBlock(src, mc, i))
+                               .collect(Collectors.toList());
                }
 
                JavaPairRDD<MatrixIndexes,MatrixBlock> result = 
sc.parallelizePairs(list);
@@ -725,6 +708,28 @@ public class SparkExecutionContext extends ExecutionContext
 
                return result;
        }
+       
+       private static Tuple2<MatrixIndexes,MatrixBlock> 
createIndexedBlock(MatrixBlock mb, MatrixCharacteristics mc, long ix) {
+               try {
+                       //compute block indexes
+                       long blockRow = ix / mc.getNumColBlocks();
+                       long blockCol = ix % mc.getNumColBlocks();
+                       //compute block sizes
+                       int maxRow = 
UtilFunctions.computeBlockSize(mc.getRows(), blockRow+1, mc.getRowsPerBlock());
+                       int maxCol = 
UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getColsPerBlock());
+                       //copy sub-matrix to block
+                       MatrixBlock block = new MatrixBlock(maxRow, maxCol, 
mb.isInSparseFormat());
+                       int row_offset = (int)blockRow*mc.getRowsPerBlock();
+                       int col_offset = (int)blockCol*mc.getColsPerBlock();
+                       block = mb.slice( row_offset, row_offset+maxRow-1,
+                               col_offset, col_offset+maxCol-1, block );
+                       //create key-value pair
+                       return new Tuple2<>(new MatrixIndexes(blockRow+1, 
blockCol+1), block);
+               }
+               catch(DMLRuntimeException ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
 
        public static JavaPairRDD<Long,FrameBlock> 
toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
                throws DMLRuntimeException

Reply via email to