[SYSTEMML-1404] Robustness ultra-sparse cell to binary block converters

The current implementations of spark rdd converters for 'textcell to
binary block', 'binary cell to binary block', and 'coordinate to binary
block' have severe robustness issues for ultra-sparse inputs, in terms
of potential OOMs at the driver or executors. All these converters
perform a union of partial blocks with a set of empty blocks before
shuffle but the collection of empty blocks is simply parallelized from
the driver. 

Consider a scenario of an ultra-sparse 30M x 30M matrix with sparsity
10^-6, which is arguable the worst-case with a single non-zero per block
on average. The number of blocks is 900M and these billion objects
(pairs of matrix indexes and empty matrix blocks) are already 64GB,
leading to OOMs at the driver. Furthermore, the cell converters hold
partial blocks in MSCR format which is two orders of magnitude larger
than a cell representation leading to OOMs at the executors.

This patch solves both problems by (1) generating empty blocks in a
scalable manner (with awareness of the target size of output
partitions), and (2) converting temporary ultra-sparse partial blocks to
COO format (which does not have any overhead for row offsets and hence
is close to the cell representation).

Finally, this also includes a fix for caching of binary cell rdds, which
would have returned null under specific conditions. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f32bd8eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f32bd8eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f32bd8eb

Branch: refs/heads/master
Commit: f32bd8ebd2b47cbe8f535917c94ab7ebf539974b
Parents: 9919a6c
Author: Matthias Boehm <[email protected]>
Authored: Wed Mar 15 22:27:45 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 16 11:30:42 2017 -0700

----------------------------------------------------------------------
 .../instructions/spark/utils/SparkUtils.java    | 87 ++++++++++++++------
 .../runtime/matrix/MatrixCharacteristics.java   |  8 +-
 .../runtime/matrix/mapred/ReblockBuffer.java    |  6 ++
 3 files changed, 74 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index b9f54f2..947c817 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -21,17 +21,22 @@
 package org.apache.sysml.runtime.instructions.spark.utils;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction;
@@ -174,36 +179,36 @@ public class SparkUtils
                        return retVal + "|" + twoSpaces;
        }
 
+       /**
+        * Creates an RDD of empty blocks according to the given matrix 
characteristics. This is
+        * done in a scalable manner by parallelizing block ranges and 
generating empty blocks
+        * in a distributed manner, under awareness of preferred output 
partition sizes.
+        * 
+        * @param sc spark context
+        * @param mc matrix characteristics
+        * @return pair rdd of empty matrix blocks 
+        */
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> getEmptyBlockRDD( 
JavaSparkContext sc, MatrixCharacteristics mc )
        {
-               //create all empty blocks
-               ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
-               int nrblks = 
(int)Math.ceil((double)mc.getRows()/mc.getRowsPerBlock());
-               int ncblks = 
(int)Math.ceil((double)mc.getCols()/mc.getColsPerBlock());
-               for(long r=1; r<=nrblks; r++)
-                       for(long c=1; c<=ncblks; c++)
-                       {
-                               int lrlen = 
UtilFunctions.computeBlockSize(mc.getRows(), r, mc.getRowsPerBlock());
-                               int lclen = 
UtilFunctions.computeBlockSize(mc.getCols(), c, mc.getColsPerBlock());
-                               MatrixIndexes ix = new MatrixIndexes(r, c);
-                               MatrixBlock mb = new MatrixBlock(lrlen, lclen, 
true);
-                               list.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(ix,mb));
-                       }
+               //compute degree of parallelism and block ranges
+               long size = mc.getNumBlocks() * 
OptimizerUtils.estimateSizeEmptyBlock(Math.max(
+                               mc.getRows(), mc.getRowsPerBlock()), 
Math.max(mc.getCols(), mc.getColsPerBlock()));
+               int par = (int) 
Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true),
+                               
Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks());
+               long pNumBlocks = 
(long)Math.ceil((double)mc.getNumBlocks()/par);
+               
+               //generate block offsets per partition
+               List<Long> offsets = LongStream.iterate(0, n -> n+pNumBlocks)
+                               
.limit(par).boxed().collect(Collectors.toList());
                
-               //create rdd of in-memory list
-               return sc.parallelizePairs(list);
+               //parallelize offsets and generate all empty blocks
+               return (JavaPairRDD<MatrixIndexes,MatrixBlock>) 
sc.parallelize(offsets, par)
+                               .flatMapToPair(new GenerateEmptyBlocks(mc, 
pNumBlocks));
        }
 
-       public static JavaPairRDD<MatrixIndexes, MatrixCell> 
cacheBinaryCellRDD(JavaPairRDD<MatrixIndexes, MatrixCell> input)
-       {
-               JavaPairRDD<MatrixIndexes, MatrixCell> ret = null;
-               
-               if( !input.getStorageLevel().equals(DEFAULT_TMP) ) {
-                       ret = input.mapToPair(new CopyBinaryCellFunction())
-                                          .persist(DEFAULT_TMP);
-               }
-               
-               return ret;
+       public static JavaPairRDD<MatrixIndexes, MatrixCell> 
cacheBinaryCellRDD(JavaPairRDD<MatrixIndexes, MatrixCell> input) {
+               return !input.getStorageLevel().equals(DEFAULT_TMP) ? 
+                       input.mapToPair(new 
CopyBinaryCellFunction()).persist(DEFAULT_TMP) : input;
        }
 
        /**
@@ -253,4 +258,36 @@ public class SparkUtils
                                        arg0.getNonZeros() + arg1.getNonZeros() 
); //sum
                }       
        }
+       
+       private static class GenerateEmptyBlocks implements 
PairFlatMapFunction<Long, MatrixIndexes, MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
630129586089106855L;
+
+               private final MatrixCharacteristics _mc;
+               private final long _pNumBlocks;
+               
+               public GenerateEmptyBlocks(MatrixCharacteristics mc, long 
pNumBlocks) {
+                       _mc = mc;
+                       _pNumBlocks = pNumBlocks;
+               }
+               
+               @Override
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Long 
arg0) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> list = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+                       long ncblks = _mc.getNumColBlocks();
+                       long nblocksU = Math.min(arg0+_pNumBlocks, 
_mc.getNumBlocks());
+                       for( long i=arg0; i<nblocksU; i++ ) {
+                               long rix = 1 + i / ncblks;
+                               long cix = 1 + i % ncblks;
+                               int lrlen = 
UtilFunctions.computeBlockSize(_mc.getRows(), rix, _mc.getRowsPerBlock());
+                               int lclen = 
UtilFunctions.computeBlockSize(_mc.getCols(), cix, _mc.getColsPerBlock());
+                               list.add(new Tuple2<MatrixIndexes,MatrixBlock>(
+                                               new MatrixIndexes(rix,cix), 
+                                               new MatrixBlock(lrlen, lclen, 
true)));
+                       }
+                       return list.iterator();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index ae0fca7..7af295f 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -141,11 +141,15 @@ public class MatrixCharacteristics implements Serializable
                numColumnsPerBlock = bclen;
        } 
        
-       public long getNumRowBlocks(){
+       public long getNumBlocks() {
+               return getNumRowBlocks() * getNumColBlocks();
+       }
+       
+       public long getNumRowBlocks() {
                return (long) Math.ceil((double)getRows() / getRowsPerBlock());
        }
        
-       public long getNumColBlocks(){
+       public long getNumColBlocks() {
                return (long) Math.ceil((double)getCols() / getColsPerBlock());
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f32bd8eb/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
index 8e5b48e..17a8618 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.PartialBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
 import org.apache.sysml.runtime.matrix.data.TaggedAdaptivePartialBlock;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -309,6 +310,11 @@ public class ReblockBuffer
                
                //ensure correct representation (for in-memory blocks)
                value.examSparsity();
+       
+               //convert ultra-sparse blocks from MCSR to COO in order to 
+               //significantly reduce temporary memory pressure until write 
+               if( value.isUltraSparse() )
+                       value = new MatrixBlock(value, Type.COO, false);
                
                //output block
                out.add(new IndexedMatrixValue(key,value));

Reply via email to