[SYSTEMML-1404] Robustness ultra-sparse cell to binary block converters The current implementations of spark rdd converters for 'textcell to binary block', 'binary cell to binary block', and 'coordinate to binary block' have severe robustness issues for ultra-sparse inputs, in terms of potential OOMs at the driver or executors. All these converters perform a union of partial blocks with a set of empty blocks before shuffle but the collection of empty blocks is simply parallelized from the driver.
Consider a scenario of an ultra-sparse 30M x 30M matrix with sparsity 10^-6, which is arguable the worst-case with a single non-zero per block on average. The number of blocks is 900M and these billion objects (pairs of matrix indexes and empty matrix blocks) are already 64GB, leading to OOMs at the driver. Furthermore, the cell converters hold partial blocks in MSCR format which is two orders of magnitude larger than a cell representation leading to OOMs at the executors. This patch solves both problems by (1) generating empty blocks in a scalable manner (with awareness of the target size of output partitions), and (2) converting temporary ultra-sparse partial blocks to COO format (which does not have any overhead for row offsets and hence is close to the cell representation). Finally, this also includes a fix for caching of binary cell rdds, which would have returned null under specific conditions. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f32bd8eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f32bd8eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f32bd8eb Branch: refs/heads/master Commit: f32bd8ebd2b47cbe8f535917c94ab7ebf539974b Parents: 9919a6c Author: Matthias Boehm <[email protected]> Authored: Wed Mar 15 22:27:45 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 16 11:30:42 2017 -0700 ---------------------------------------------------------------------- .../instructions/spark/utils/SparkUtils.java | 87 ++++++++++++++------ .../runtime/matrix/MatrixCharacteristics.java | 8 +- .../runtime/matrix/mapred/ReblockBuffer.java | 6 ++ 3 files changed, 74 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index b9f54f2..947c817 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -21,17 +21,22 @@ package org.apache.sysml.runtime.instructions.spark.utils; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.storage.StorageLevel; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction; @@ -174,36 +179,36 @@ public class SparkUtils return retVal + "|" + twoSpaces; } + /** + * Creates an RDD of empty blocks according to the given matrix characteristics. This is + * done in a scalable manner by parallelizing block ranges and generating empty blocks + * in a distributed manner, under awareness of preferred output partition sizes. + * + * @param sc spark context + * @param mc matrix characteristics + * @return pair rdd of empty matrix blocks + */ public static JavaPairRDD<MatrixIndexes, MatrixBlock> getEmptyBlockRDD( JavaSparkContext sc, MatrixCharacteristics mc ) { - //create all empty blocks - ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); - int nrblks = (int)Math.ceil((double)mc.getRows()/mc.getRowsPerBlock()); - int ncblks = (int)Math.ceil((double)mc.getCols()/mc.getColsPerBlock()); - for(long r=1; r<=nrblks; r++) - for(long c=1; c<=ncblks; c++) - { - int lrlen = UtilFunctions.computeBlockSize(mc.getRows(), r, mc.getRowsPerBlock()); - int lclen = UtilFunctions.computeBlockSize(mc.getCols(), c, mc.getColsPerBlock()); - MatrixIndexes ix = new MatrixIndexes(r, c); - MatrixBlock mb = new MatrixBlock(lrlen, lclen, true); - list.add(new Tuple2<MatrixIndexes,MatrixBlock>(ix,mb)); - } + //compute degree of parallelism and block ranges + long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.max( + mc.getRows(), mc.getRowsPerBlock()), Math.max(mc.getCols(), mc.getColsPerBlock())); + int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true), + Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks()); + long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par); + + //generate block offsets per partition + List<Long> offsets = LongStream.iterate(0, n -> n+pNumBlocks) + .limit(par).boxed().collect(Collectors.toList()); - //create rdd of in-memory list - return sc.parallelizePairs(list); + //parallelize offsets and generate all empty blocks + return (JavaPairRDD<MatrixIndexes,MatrixBlock>) sc.parallelize(offsets, par) + .flatMapToPair(new GenerateEmptyBlocks(mc, pNumBlocks)); } - public static JavaPairRDD<MatrixIndexes, MatrixCell> cacheBinaryCellRDD(JavaPairRDD<MatrixIndexes, MatrixCell> input) - { - JavaPairRDD<MatrixIndexes, MatrixCell> ret = null; - - if( !input.getStorageLevel().equals(DEFAULT_TMP) ) { - ret = input.mapToPair(new CopyBinaryCellFunction()) - .persist(DEFAULT_TMP); - } - - return ret; + public static JavaPairRDD<MatrixIndexes, MatrixCell> cacheBinaryCellRDD(JavaPairRDD<MatrixIndexes, MatrixCell> input) { + return !input.getStorageLevel().equals(DEFAULT_TMP) ? + input.mapToPair(new CopyBinaryCellFunction()).persist(DEFAULT_TMP) : input; } /** @@ -253,4 +258,36 @@ public class SparkUtils arg0.getNonZeros() + arg1.getNonZeros() ); //sum } } + + private static class GenerateEmptyBlocks implements PairFlatMapFunction<Long, MatrixIndexes, MatrixBlock> + { + private static final long serialVersionUID = 630129586089106855L; + + private final MatrixCharacteristics _mc; + private final long _pNumBlocks; + + public GenerateEmptyBlocks(MatrixCharacteristics mc, long pNumBlocks) { + _mc = mc; + _pNumBlocks = pNumBlocks; + } + + @Override + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long arg0) + throws Exception + { + ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); + long ncblks = _mc.getNumColBlocks(); + long nblocksU = Math.min(arg0+_pNumBlocks, _mc.getNumBlocks()); + for( long i=arg0; i<nblocksU; i++ ) { + long rix = 1 + i / ncblks; + long cix = 1 + i % ncblks; + int lrlen = UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock()); + int lclen = UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock()); + list.add(new Tuple2<MatrixIndexes,MatrixBlock>( + new MatrixIndexes(rix,cix), + new MatrixBlock(lrlen, lclen, true))); + } + return list.iterator(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java index ae0fca7..7af295f 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java @@ -141,11 +141,15 @@ public class MatrixCharacteristics implements Serializable numColumnsPerBlock = bclen; } - public long getNumRowBlocks(){ + public long getNumBlocks() { + return getNumRowBlocks() * getNumColBlocks(); + } + + public long getNumRowBlocks() { return (long) Math.ceil((double)getRows() / getRowsPerBlock()); } - public long getNumColBlocks(){ + public long getNumColBlocks() { return (long) Math.ceil((double)getCols() / getColsPerBlock()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java index 8e5b48e..17a8618 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java @@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.PartialBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock.Type; import org.apache.sysml.runtime.matrix.data.TaggedAdaptivePartialBlock; import org.apache.sysml.runtime.util.UtilFunctions; @@ -309,6 +310,11 @@ public class ReblockBuffer //ensure correct representation (for in-memory blocks) value.examSparsity(); + + //convert ultra-sparse blocks from MCSR to COO in order to + //significantly reduce temporary memory pressure until write + if( value.isUltraSparse() ) + value = new MatrixBlock(value, Type.COO, false); //output block out.add(new IndexedMatrixValue(key,value));
