[SYSTEMML-2282] Memory efficiency spark empty block injection 

This patch improves the memory efficiency of spark empty block injection
to reduce GC overheads. For the scenario of creating partitions with all
empty blocks of a matrix (for union with non-zero block reblock), the
number of blocks is often very large (>3M per partition). Hence, we now
use more conservative partition sizes of 32 instead of 128MB as well as
lazy iterators when creating the blocks for a single offset (i.e., for a
single partition).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb43ddd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb43ddd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb43ddd

Branch: refs/heads/master
Commit: 7cb43dddda45e5e6ceae5371b9bc15b28e72ac63
Parents: 3b359c3
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 26 21:05:31 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Apr 27 00:03:18 2018 -0700

----------------------------------------------------------------------
 .../instructions/spark/utils/SparkUtils.java     | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb43ddd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 49232da..952135e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -20,7 +20,6 @@
 
 package org.apache.sysml.runtime.instructions.spark.utils;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -188,7 +187,7 @@ public class SparkUtils
                //compute degree of parallelism and block ranges
                long size = mc.getNumBlocks() * 
OptimizerUtils.estimateSizeEmptyBlock(Math.min(
                                Math.max(mc.getRows(),1), 
mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), 
mc.getColsPerBlock()));
-               int par = (int) 
Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true),
+               int par = (int) 
Math.min(4*Math.max(SparkExecutionContext.getDefaultParallelism(true),
                                
Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks());
                long pNumBlocks = 
(long)Math.ceil((double)mc.getNumBlocks()/par);
                
@@ -273,21 +272,19 @@ public class SparkUtils
                }
                
                @Override
-               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long 
arg0) 
-                       throws Exception 
-               {
-                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
ArrayList<>();
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long 
arg0) throws Exception {
+                       //NOTE: for cases of a very large number of empty 
blocks per partition
+                       //(e.g., >3M for 128MB partitions), it is important for 
low GC overhead 
+                       //not to materialized these objects but return a lazy 
iterator instead.
                        long ncblks = _mc.getNumColBlocks();
                        long nblocksU = Math.min(arg0+_pNumBlocks, 
_mc.getNumBlocks());
-                       for( long i=arg0; i<nblocksU; i++ ) {
+                       return LongStream.range(arg0, nblocksU).mapToObj(i -> {
                                long rix = 1 + i / ncblks;
                                long cix = 1 + i % ncblks;
                                int lrlen = 
UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock());
                                int lclen = 
UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock());
-                               list.add(new Tuple2<>(new 
MatrixIndexes(rix,cix), 
-                                       new MatrixBlock(lrlen, lclen, true)));
-                       }
-                       return list.iterator();
+                               return new Tuple2<>(new MatrixIndexes(rix,cix), 
new MatrixBlock(lrlen, lclen, true));
+                       }).iterator();
                }
        }
 }

Reply via email to