Repository: incubator-systemml
Updated Branches:
  refs/heads/master ecf5e1b4c -> 5decbe64b


[SYSTEMML-946] Fix oom spark dataframe-matrix/csv-matrix converters

See details in https://issues.apache.org/jira/browse/SYSTEMML-946.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5decbe64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5decbe64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5decbe64

Branch: refs/heads/master
Commit: 5decbe64b362e29901f15231c672c7b7816ed55a
Parents: ecf5e1b
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Tue Sep 20 17:44:36 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Tue Sep 20 17:44:36 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 47 +++++++++----
 .../DataFrameMatrixConversionTest.java          | 73 +++++++++++++-------
 2 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3ee1ef8..a619a4d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -204,19 +204,19 @@ public class RDDConverterUtils
         * @throws DMLRuntimeException
         */
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
csvToBinaryBlock(JavaSparkContext sc,
-                       JavaPairRDD<LongWritable, Text> input, 
MatrixCharacteristics mcOut, 
+                       JavaPairRDD<LongWritable, Text> input, 
MatrixCharacteristics mc, 
                        boolean hasHeader, String delim, boolean fill, double 
fillValue) 
                throws DMLRuntimeException 
        {
                //determine unknown dimensions and sparsity if required
-               if( !mcOut.dimsKnown(true) ) {
+               if( !mc.dimsKnown(true) ) {
                        Accumulator<Double> aNnz = sc.accumulator(0L);
                        JavaRDD<String> tmp = input.values()
                                        .map(new CSVAnalysisFunction(aNnz, 
delim));
                        long rlen = tmp.count() - (hasHeader ? 1 : 0);
                        long clen = tmp.first().split(delim).length;
                        long nnz = UtilFunctions.toLong(aNnz.value());
-                       mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), nnz);
+                       mc.set(rlen, clen, mc.getRowsPerBlock(), 
mc.getColsPerBlock(), nnz);
                }
                
                //prepare csv w/ row indexes (sorted by filenames)
@@ -224,9 +224,10 @@ public class RDDConverterUtils
                                .zipWithIndex(); //zip row index
                
                //convert csv rdd to binary block rdd (w/ partial blocks)
+               boolean sparse = requiresSparseAllocation(prepinput, mc);
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-                               prepinput.mapPartitionsToPair(
-                                       new CSVToBinaryBlockFunction(mcOut, 
hasHeader, delim, fill, fillValue));
+                               prepinput.mapPartitionsToPair(new 
CSVToBinaryBlockFunction(
+                                               mc, sparse, hasHeader, delim, 
fill, fillValue));
                
                //aggregate partial matrix blocks
                out = RDDAggregateUtils.mergeByKey( out ); 
@@ -298,14 +299,16 @@ public class RDDConverterUtils
                        mc.setBlockSize(ConfigurationManager.getBlocksize());
                }
                
+               //construct or reuse row ids
                JavaPairRDD<Row, Long> prepinput = containsID ?
                                df.javaRDD().mapToPair(new 
DataFrameExtractIDFunction()) :
                                df.javaRDD().zipWithIndex(); //zip row index
                
                //convert csv rdd to binary block rdd (w/ partial blocks)
+               boolean sparse = requiresSparseAllocation(prepinput, mc);
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
                                prepinput.mapPartitionsToPair(
-                                       new DataFrameToBinaryBlockFunction(mc, 
containsID, isVector));
+                                       new DataFrameToBinaryBlockFunction(mc, 
sparse, containsID, isVector));
                
                //aggregate partial matrix blocks
                out = RDDAggregateUtils.mergeByKey( out ); 
@@ -357,6 +360,28 @@ public class RDDConverterUtils
                return in.mapToPair(new TextToSerTextFunction());
        }
 
+       /**
+        * 
+        * @param in
+        * @param mc
+        * @return
+        */
+       private static boolean requiresSparseAllocation(JavaPairRDD<?,?> in, 
MatrixCharacteristics mc) {
+               //if nnz unknown or sparse, pick the robust sparse 
representation
+               if( !mc.nnzKnown() || (mc.nnzKnown() && 
MatrixBlock.evalSparseFormatInMemory(
+                       mc.getRows(), mc.getCols(), mc.getNonZeros())) ) {
+                       return true;
+               }
+               
+               //if dense evaluate expected rows per partition to handle wide 
matrices
+               //(pick sparse representation if fraction of rows per block 
less than sparse theshold)
+               double datasize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);
+               double rowsize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(),
+                               mc.getNumRowBlocks(), mc.getColsPerBlock(), 
Math.ceil((double)mc.getNonZeros()/mc.getRows()));
+               double partsize = Math.ceil(datasize/in.partitions().size());
+               double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock());
+               return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
+       }
        
        /////////////////////////////////
        // BINARYBLOCK-SPECIFIC FUNCTIONS
@@ -633,15 +658,14 @@ public class RDDConverterUtils
                private boolean _fill = false;
                private double _fillValue = 0;
                
-               public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean hasHeader, String delim, boolean fill, double fillValue)
+               public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean sparse, boolean hasHeader, String delim, boolean fill, double fillValue)
                {
                        _rlen = mc.getRows();
                        _clen = mc.getCols();
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
                        _sparsity = OptimizerUtils.getSparsity(mc);
-                       _sparse = mc.nnzKnown() && 
MatrixBlock.evalSparseFormatInMemory(mc.getRows(), 
-                                       mc.getCols(), mc.getNonZeros()) && 
(!fill || fillValue==0);
+                       _sparse = sparse && (!fill || fillValue==0);
                        _header = hasHeader;
                        _delim = delim;
                        _fill = fill;
@@ -879,13 +903,12 @@ public class RDDConverterUtils
                private boolean _containsID;
                private boolean _isVector;
                
-               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean containsID, boolean isVector) {
+               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean sparse, boolean containsID, boolean isVector) {
                        _rlen = mc.getRows();
                        _clen = mc.getCols();
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
-                       _sparse = mc.nnzKnown() && 
MatrixBlock.evalSparseFormatInMemory(
-                                       mc.getRows(), mc.getCols(), 
mc.getNonZeros());
+                       _sparse = sparse;
                        _containsID = containsID;
                        _isVector = isVector;
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5decbe64/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index e88a867..08511cc 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -45,9 +45,11 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
        private final static String TEST_NAME = "DataFrameConversion";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameMatrixConversionTest.class.getSimpleName() + "/";
 
-       private final static int  rows1 = 2245;
-       private final static int  cols1 = 745;
-       private final static int  cols2 = 1264;
+       private final static int rows1 = 2245;
+       private final static int rows3 = 7;
+       private final static int cols1 = 745;
+       private final static int cols2 = 1264;
+       private final static int cols3 = 1003820;
        private final static double sparsity1 = 0.9;
        private final static double sparsity2 = 0.1;
        private final static double eps=0.0000000001;
@@ -60,82 +62,102 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
        
        @Test
        public void testVectorConversionSingleDense() {
-               testDataFrameConversion(true, true, true, false);
+               testDataFrameConversion(true, cols1, true, false);
        }
        
        @Test
        public void testVectorConversionSingleDenseUnknown() {
-               testDataFrameConversion(true, true, true, true);
+               testDataFrameConversion(true, cols1, true, true);
        }
        
        @Test
        public void testVectorConversionSingleSparse() {
-               testDataFrameConversion(true, true, false, false);
+               testDataFrameConversion(true, cols1, false, false);
        }
        
        @Test
        public void testVectorConversionSingleSparseUnknown() {
-               testDataFrameConversion(true, true, false, true);
+               testDataFrameConversion(true, cols1, false, true);
        }
        
        @Test
        public void testVectorConversionMultiDense() {
-               testDataFrameConversion(true, false, true, false);
+               testDataFrameConversion(true, cols2, true, false);
        }
        
        @Test
        public void testVectorConversionMultiDenseUnknown() {
-               testDataFrameConversion(true, false, true, true);
+               testDataFrameConversion(true, cols2, true, true);
        }
        
        @Test
        public void testVectorConversionMultiSparse() {
-               testDataFrameConversion(true, false, false, false);
+               testDataFrameConversion(true, cols2, false, false);
        }
        
        @Test
        public void testVectorConversionMultiSparseUnknown() {
-               testDataFrameConversion(true, false, false, true);
+               testDataFrameConversion(true, cols2, false, true);
        }
 
        @Test
        public void testRowConversionSingleDense() {
-               testDataFrameConversion(false, true, true, false);
+               testDataFrameConversion(false, cols1, true, false);
        }
        
        @Test
        public void testRowConversionSingleDenseUnknown() {
-               testDataFrameConversion(false, true, true, true);
+               testDataFrameConversion(false, cols1, true, true);
        }
        
        @Test
        public void testRowConversionSingleSparse() {
-               testDataFrameConversion(false, true, false, false);
+               testDataFrameConversion(false, cols1, false, false);
        }
        
        @Test
        public void testRowConversionSingleSparseUnknown() {
-               testDataFrameConversion(false, true, false, true);
+               testDataFrameConversion(false, cols1, false, true);
        }
        
        @Test
        public void testRowConversionMultiDense() {
-               testDataFrameConversion(false, false, true, false);
+               testDataFrameConversion(false, cols2, true, false);
        }
        
        @Test
        public void testRowConversionMultiDenseUnknown() {
-               testDataFrameConversion(false, false, true, true);
+               testDataFrameConversion(false, cols2, true, true);
        }
        
        @Test
        public void testRowConversionMultiSparse() {
-               testDataFrameConversion(false, false, false, false);
+               testDataFrameConversion(false, cols2, false, false);
        }
        
        @Test
        public void testRowConversionMultiSparseUnknown() {
-               testDataFrameConversion(false, false, false, true);
+               testDataFrameConversion(false, cols2, false, true);
+       }
+       
+       @Test
+       public void testVectorConversionWideDense() {
+               testDataFrameConversion(true, cols3, true, false);
+       }
+       
+       @Test
+       public void testVectorConversionWideDenseUnknown() {
+               testDataFrameConversion(true, cols3, true, true);
+       }
+       
+       @Test
+       public void testVectorConversionWideSparse() {
+               testDataFrameConversion(true, cols3, false, false);
+       }
+       
+       @Test
+       public void testVectorConversionWideSparseUnknown() {
+               testDataFrameConversion(true, cols3, false, true);
        }
        
        /**
@@ -145,7 +167,7 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
         * @param dense
         * @param unknownDims
         */
-       private void testDataFrameConversion(boolean vector, boolean 
singleColBlock, boolean dense, boolean unknownDims) {
+       private void testDataFrameConversion(boolean vector, int cols, boolean 
dense, boolean unknownDims) {
                boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
                RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
 
@@ -157,12 +179,12 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
                        DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
                        
                        //generate input data and setup metadata
-                       int cols = singleColBlock ? cols1 : cols2;
+                       int rows = (cols == cols3) ? rows3 : rows1;
                        double sparsity = dense ? sparsity1 : sparsity2; 
-                       double[][] A = getRandomMatrix(rows1, cols, -10, 10, 
sparsity, 2373); 
+                       double[][] A = getRandomMatrix(rows, cols, -10, 10, 
sparsity, 2373); 
                        MatrixBlock mbA = 
DataConverter.convertToMatrixBlock(A); 
                        int blksz = ConfigurationManager.getBlocksize();
-                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows, cols, blksz, blksz, mbA.getNonZeros());
                        MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
                        
                        //setup spark context
@@ -175,14 +197,15 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
                        
                        //matrix - dataframe - matrix conversion
                        DataFrame df = 
RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+                       df = ( rows==rows3 ) ? df.repartition(rows) : df;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
                        
                        //get output matrix block
-                       MatrixBlock mbB = 
SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+                       MatrixBlock mbB = 
SparkExecutionContext.toMatrixBlock(out, rows, cols, blksz, blksz, -1);
                        
                        //compare matrix blocks
                        double[][] B = DataConverter.convertToDoubleMatrix(mbB);
-                       TestUtils.compareMatrices(A, B, rows1, cols, eps);
+                       TestUtils.compareMatrices(A, B, rows, cols, eps);
                }
                catch( Exception ex ) {
                        throw new RuntimeException(ex);

Reply via email to