[SYSTEMML-2282] Memory efficiency spark empty block injection This patch improves the memory efficiency of spark empty block injection to reduce GC overheads. For the scenario of creating partitions with all empty blocks of a matrix (for union with non-zero block reblock), the number of blocks is often very large (>3M per partition). Hence, we now use more conservative partition sizes of 32 instead of 128MB as well as lazy iterators when creating the blocks for a single offset (i.e., for a single partition).
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/7cb43ddd Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/7cb43ddd Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/7cb43ddd Branch: refs/heads/master Commit: 7cb43dddda45e5e6ceae5371b9bc15b28e72ac63 Parents: 3b359c3 Author: Matthias Boehm <[email protected]> Authored: Thu Apr 26 21:05:31 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Apr 27 00:03:18 2018 -0700 ---------------------------------------------------------------------- .../instructions/spark/utils/SparkUtils.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/7cb43ddd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 49232da..952135e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -20,7 +20,6 @@ package org.apache.sysml.runtime.instructions.spark.utils; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -188,7 +187,7 @@ public class SparkUtils //compute degree of parallelism and block ranges long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.min( Math.max(mc.getRows(),1), mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), mc.getColsPerBlock())); - int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true), + int par = (int) Math.min(4*Math.max(SparkExecutionContext.getDefaultParallelism(true), Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks()); long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par); @@ -273,21 +272,19 @@ public class SparkUtils } @Override - public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) - throws Exception - { - ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new ArrayList<>(); + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) throws Exception { + //NOTE: for cases of a very large number of empty blocks per partition + //(e.g., >3M for 128MB partitions), it is important for low GC overhead + //not to materialized these objects but return a lazy iterator instead. long ncblks = _mc.getNumColBlocks(); long nblocksU = Math.min(arg0+_pNumBlocks, _mc.getNumBlocks()); - for( long i=arg0; i<nblocksU; i++ ) { + return LongStream.range(arg0, nblocksU).mapToObj(i -> { long rix = 1 + i / ncblks; long cix = 1 + i % ncblks; int lrlen = UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock()); int lclen = UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock()); - list.add(new Tuple2<>(new MatrixIndexes(rix,cix), - new MatrixBlock(lrlen, lclen, true))); - } - return list.iterator(); + return new Tuple2<>(new MatrixIndexes(rix,cix), new MatrixBlock(lrlen, lclen, true)); + }).iterator(); } } }
