[SYSTEMML-2052] Fix ultra-sparse dataset-matrix conversion

This patch fixes special cases of converting ultra-sparse datasets to
binary block matrices. Specifically, there was an issue of
pre-allocating the MCSR sparse rows, which resulted in
index-out-of-bounds exceptions on preallocated rows of size zero. We now
correctly determine the nnz per block range and made the MCSR format
more robust to handling resizing rows with initial size zero.

Furthermore, this also includes additional tests for ultra-sparse
datasets and a performance improvement for counting the nnz for index
ranges of sparse dataset vectors.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d91d24a9
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d91d24a9
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d91d24a9

Branch: refs/heads/master
Commit: d91d24a9fa157a490b227632605249e7dff0ad67
Parents: 38332ac
Author: Matthias Boehm <[email protected]>
Authored: Mon Dec 25 14:10:04 2017 +0100
Committer: Matthias Boehm <[email protected]>
Committed: Mon Dec 25 14:26:00 2017 +0100

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 47 +++++++++++----
 .../runtime/matrix/data/SparseRowVector.java    |  9 ++-
 .../DataFrameMatrixConversionTest.java          | 63 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 53e42ac..e3ab541 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.ml.feature.LabeledPoint;
+import org.apache.spark.ml.linalg.DenseVector;
 import org.apache.spark.ml.linalg.SparseVector;
 import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.VectorUDT;
@@ -397,19 +398,38 @@ public class RDDConverterUtils
                if( isVector ) //note: numNonzeros scans entries but handles 
sparse/dense
                        return ((Vector) vect).numNonzeros();
                else 
-                       return countNnz(vect, isVector, off, 
((Row)vect).length()-off);
+                       return countNnz(vect, isVector, off, 
((Row)vect).length());
        }
 
-       private static int countNnz(Object vect, boolean isVector, int pos, int 
len ) {
+       /**
+        * Count the number of non-zeros for a subrange of the given row.
+        * 
+        * @param vect row object (row of basic types or row including a vector)
+        * @param isVector if the row includes a vector
+        * @param pos physical position 
+        * @param cu logical upper column index (exclusive) 
+        * @return number of non-zeros.
+        */
+       private static int countNnz(Object vect, boolean isVector, int pos, int 
cu ) {
                int lnnz = 0;
                if( isVector ) {
-                       Vector vec = (Vector) vect;
-                       for( int i=pos; i<pos+len; i++ )
-                               lnnz += (vec.apply(i) != 0) ? 1 : 0;
+                       if( vect instanceof DenseVector ) {
+                               DenseVector vec = (DenseVector) vect;
+                               for( int i=pos; i<cu; i++ )
+                                       lnnz += (vec.apply(i) != 0) ? 1 : 0;
+                       }
+                       else if( vect instanceof SparseVector ) {
+                               SparseVector vec = (SparseVector) vect;
+                               int alen = vec.numActives();
+                               int[] aix = vec.indices();
+                               double[] avals = vec.values();
+                               for( int i=pos; i<alen && aix[i]<cu; i++ )
+                                       lnnz += (avals[i] != 0) ? 1 : 0;
+                       }
                }
                else { //row
                        Row row = (Row) vect;
-                       for( int i=pos; i<pos+len; i++ )
+                       for( int i=pos; i<cu; i++ )
                                lnnz += UtilFunctions.isNonZero(row.get(i)) ? 1 
: 0;
                }
                return lnnz;
@@ -1047,13 +1067,14 @@ public class RDDConverterUtils
                                }
                                
                                //process row data
-                               int off = _containsID ? 1: 0;
+                               int off = _containsID ? 1 : 0;
                                Object obj = _isVector ? tmp._1().get(off) : 
tmp._1();
                                for( int cix=1, pix=_isVector?0:off; 
cix<=ncblks; cix++ ) {
                                        int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
+                                       int cu = (int) Math.min(_clen, 
cix*_bclen) + (_isVector?0:off);
                                        //allocate sparse row once (avoid 
re-allocations)
                                        if( mb[cix-1].isInSparseFormat() ) {
-                                               int lnnz = countNnz(obj, 
_isVector, pix, lclen);
+                                               int lnnz = countNnz(obj, 
_isVector, pix, cu);
                                                
mb[cix-1].getSparseBlock().allocate(pos, lnnz);
                                        }
                                        //append data to matrix blocks
@@ -1062,14 +1083,14 @@ public class RDDConverterUtils
                                                if( vect instanceof 
SparseVector ) {
                                                        SparseVector svect = 
(SparseVector) vect;
                                                        int[] svectIx = 
svect.indices();
-                                                       while( 
pix<svectIx.length && svectIx[pix]<cix*_bclen ) {
+                                                       while( 
pix<svectIx.length && svectIx[pix]<cu ) {
                                                                int j = 
UtilFunctions.computeCellInBlock(svectIx[pix]+1, _bclen);
                                                                
mb[cix-1].appendValue(pos, j, svect.values()[pix++]);
                                                        }
                                                }
                                                else { //dense
                                                        for( int j=0; j<lclen; 
j++ )
-                                                               
mb[cix-1].appendValue(pos, j, vect.apply(pix++));       
+                                                               
mb[cix-1].appendValue(pos, j, vect.apply(pix++));
                                                }
                                        }
                                        else { //row
@@ -1095,7 +1116,7 @@ public class RDDConverterUtils
                        
                        //create all column blocks (assume dense since csv is 
dense text format)
                        for( int cix=1; cix<=ncblks; cix++ ) {
-                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
                                ix[cix-1] = new MatrixIndexes(rix, cix);
                                mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse,(int)(lrlen*lclen*_sparsity));
                                mb[cix-1].allocateBlock();
@@ -1106,12 +1127,12 @@ public class RDDConverterUtils
                private static void flushBlocksToList( MatrixIndexes[] ix, 
MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
                        throws DMLRuntimeException
                {
-                       int len = ix.length;                    
+                       int len = ix.length;
                        for( int i=0; i<len; i++ )
                                if( mb[i] != null ) {
                                        ret.add(new Tuple2<>(ix[i],mb[i]));
                                        mb[i].examSparsity(); //ensure right 
representation
-                               }       
+                               }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index a73ed1a..1d71448 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -147,9 +147,12 @@ public final class SparseRowVector extends SparseRow 
implements Serializable
         * @return new capacity for resizing
         */
        private int newCapacity() {
-               return (int) ((values.length < estimatedNzs) ?
-                       Math.min(estimatedNzs, 
values.length*SparseBlock.RESIZE_FACTOR1) :
-                       Math.min(maxNzs, 
Math.ceil(values.length*SparseBlock.RESIZE_FACTOR2)));
+               final double currLen = values.length;
+               //scale length exponentially based on estimated number of 
non-zeros
+               final int nextLen = (int)Math.ceil(currLen * ((currLen < 
estimatedNzs) ? 
+                       SparseBlock.RESIZE_FACTOR1 : 
SparseBlock.RESIZE_FACTOR2));
+               //cap at max number of non-zeros with robustness of initial zero
+               return Math.max(2, Math.min(maxNzs, nextLen));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index a6b6811..ec946fb 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -30,6 +30,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.util.DataConverter;
@@ -169,7 +170,27 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
        public void testVectorConversionWideSparseUnknown() {
                testDataFrameConversion(true, cols3, false, true);
        }
+       
+       @Test
+       public void testVectorConversionMultiUltraSparse() {
+               testDataFrameConversionUltraSparse(true, false);
+       }
+       
+       @Test
+       public void testVectorConversionMultiUltraSparseUnknown() {
+               testDataFrameConversionUltraSparse(true, true);
+       }
 
+       @Test
+       public void testRowConversionMultiUltraSparse() {
+               testDataFrameConversionUltraSparse(false, false);
+       }
+       
+       @Test
+       public void testRowConversionMultiUltraSparseUnknown() {
+               testDataFrameConversionUltraSparse(false, true);
+       }
+       
        private void testDataFrameConversion(boolean vector, int cols, boolean 
dense, boolean unknownDims) {
                boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
                RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
@@ -212,6 +233,48 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
                }
        }
 
+       private void testDataFrameConversionUltraSparse(boolean vector, boolean 
unknownDims) {
+               boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+               RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+               try
+               {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+                       DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+                       
+                       //generate input data and setup metadata
+                       double[][] A = getRandomMatrix(rows1, 1, -10, 10, 0.7, 
2373);
+                       MatrixBlock mbA0 = 
DataConverter.convertToMatrixBlock(A);
+                       MatrixBlock mbA = LibMatrixReorg.diag(mbA0, new 
MatrixBlock(rows1, rows1, true));
+                       
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, rows1, blksz, blksz, mbA.getNonZeros());
+                       MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+
+                       //get binary block input rdd
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+                       
+                       //matrix - dataframe - matrix conversion
+                       Dataset<Row> df = 
RDDConverterUtils.binaryBlockToDataFrame(spark, in, mc1, vector);
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+                       
+                       //get output matrix block
+                       MatrixBlock mbB0 = 
SparkExecutionContext.toMatrixBlock(out, rows1, rows1, blksz, blksz, -1);
+                       MatrixBlock mbB = LibMatrixReorg.diag(mbB0, new 
MatrixBlock(rows1, 1, false));
+                       
+                       //compare matrix blocks
+                       double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+                       TestUtils.compareMatrices(A, B, rows1, 1, eps);
+               }
+               catch( Exception ex ) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+                       DMLScript.rtplatform = oldPlatform;
+               }
+       }
+       
        @AfterClass
        public static void tearDownClass() {
                // stop underlying spark context to allow single jvm tests 
(otherwise the

Reply via email to