[SYSTEMML-2052] Fix ultra-sparse dataset-matrix conversion This patch fixes special cases of converting ultra-sparse datasets to binary block matrices. Specifically, there was an issue of pre-allocating the MCSR sparse rows, which resulted in index-out-of-bounds exceptions on preallocated rows of size zero. We now correctly determine the nnz per block range and made the MCSR format more robust to handling resizing rows with initial size zero.
Furthermore, this also includes additional tests for ultra-sparse datasets and a performance improvement for counting the nnz for index ranges of sparse dataset vectors. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d91d24a9 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d91d24a9 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d91d24a9 Branch: refs/heads/master Commit: d91d24a9fa157a490b227632605249e7dff0ad67 Parents: 38332ac Author: Matthias Boehm <[email protected]> Authored: Mon Dec 25 14:10:04 2017 +0100 Committer: Matthias Boehm <[email protected]> Committed: Mon Dec 25 14:26:00 2017 +0100 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtils.java | 47 +++++++++++---- .../runtime/matrix/data/SparseRowVector.java | 9 ++- .../DataFrameMatrixConversionTest.java | 63 ++++++++++++++++++++ 3 files changed, 103 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 53e42ac..e3ab541 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.ml.feature.LabeledPoint; +import org.apache.spark.ml.linalg.DenseVector; import org.apache.spark.ml.linalg.SparseVector; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.VectorUDT; @@ -397,19 +398,38 @@ public class RDDConverterUtils if( isVector ) //note: numNonzeros scans entries but handles sparse/dense return ((Vector) vect).numNonzeros(); else - return countNnz(vect, isVector, off, ((Row)vect).length()-off); + return countNnz(vect, isVector, off, ((Row)vect).length()); } - private static int countNnz(Object vect, boolean isVector, int pos, int len ) { + /** + * Count the number of non-zeros for a subrange of the given row. + * + * @param vect row object (row of basic types or row including a vector) + * @param isVector if the row includes a vector + * @param pos physical position + * @param cu logical upper column index (exclusive) + * @return number of non-zeros. + */ + private static int countNnz(Object vect, boolean isVector, int pos, int cu ) { int lnnz = 0; if( isVector ) { - Vector vec = (Vector) vect; - for( int i=pos; i<pos+len; i++ ) - lnnz += (vec.apply(i) != 0) ? 1 : 0; + if( vect instanceof DenseVector ) { + DenseVector vec = (DenseVector) vect; + for( int i=pos; i<cu; i++ ) + lnnz += (vec.apply(i) != 0) ? 1 : 0; + } + else if( vect instanceof SparseVector ) { + SparseVector vec = (SparseVector) vect; + int alen = vec.numActives(); + int[] aix = vec.indices(); + double[] avals = vec.values(); + for( int i=pos; i<alen && aix[i]<cu; i++ ) + lnnz += (avals[i] != 0) ? 1 : 0; + } } else { //row Row row = (Row) vect; - for( int i=pos; i<pos+len; i++ ) + for( int i=pos; i<cu; i++ ) lnnz += UtilFunctions.isNonZero(row.get(i)) ? 1 : 0; } return lnnz; @@ -1047,13 +1067,14 @@ public class RDDConverterUtils } //process row data - int off = _containsID ? 1: 0; + int off = _containsID ? 1 : 0; Object obj = _isVector ? tmp._1().get(off) : tmp._1(); for( int cix=1, pix=_isVector?0:off; cix<=ncblks; cix++ ) { int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + int cu = (int) Math.min(_clen, cix*_bclen) + (_isVector?0:off); //allocate sparse row once (avoid re-allocations) if( mb[cix-1].isInSparseFormat() ) { - int lnnz = countNnz(obj, _isVector, pix, lclen); + int lnnz = countNnz(obj, _isVector, pix, cu); mb[cix-1].getSparseBlock().allocate(pos, lnnz); } //append data to matrix blocks @@ -1062,14 +1083,14 @@ public class RDDConverterUtils if( vect instanceof SparseVector ) { SparseVector svect = (SparseVector) vect; int[] svectIx = svect.indices(); - while( pix<svectIx.length && svectIx[pix]<cix*_bclen ) { + while( pix<svectIx.length && svectIx[pix]<cu ) { int j = UtilFunctions.computeCellInBlock(svectIx[pix]+1, _bclen); mb[cix-1].appendValue(pos, j, svect.values()[pix++]); } } else { //dense for( int j=0; j<lclen; j++ ) - mb[cix-1].appendValue(pos, j, vect.apply(pix++)); + mb[cix-1].appendValue(pos, j, vect.apply(pix++)); } } else { //row @@ -1095,7 +1116,7 @@ public class RDDConverterUtils //create all column blocks (assume dense since csv is dense text format) for( int cix=1; cix<=ncblks; cix++ ) { - int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); ix[cix-1] = new MatrixIndexes(rix, cix); mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity)); mb[cix-1].allocateBlock(); @@ -1106,12 +1127,12 @@ public class RDDConverterUtils private static void flushBlocksToList( MatrixIndexes[] ix, MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) throws DMLRuntimeException { - int len = ix.length; + int len = ix.length; for( int i=0; i<len; i++ ) if( mb[i] != null ) { ret.add(new Tuple2<>(ix[i],mb[i])); mb[i].examSparsity(); //ensure right representation - } + } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java index a73ed1a..1d71448 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java @@ -147,9 +147,12 @@ public final class SparseRowVector extends SparseRow implements Serializable * @return new capacity for resizing */ private int newCapacity() { - return (int) ((values.length < estimatedNzs) ? - Math.min(estimatedNzs, values.length*SparseBlock.RESIZE_FACTOR1) : - Math.min(maxNzs, Math.ceil(values.length*SparseBlock.RESIZE_FACTOR2))); + final double currLen = values.length; + //scale length exponentially based on estimated number of non-zeros + final int nextLen = (int)Math.ceil(currLen * ((currLen < estimatedNzs) ? + SparseBlock.RESIZE_FACTOR1 : SparseBlock.RESIZE_FACTOR2)); + //cap at max number of non-zeros with robustness of initial zero + return Math.max(2, Math.min(maxNzs, nextLen)); } @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/d91d24a9/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java index a6b6811..ec946fb 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.LibMatrixReorg; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.util.DataConverter; @@ -169,7 +170,27 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase public void testVectorConversionWideSparseUnknown() { testDataFrameConversion(true, cols3, false, true); } + + @Test + public void testVectorConversionMultiUltraSparse() { + testDataFrameConversionUltraSparse(true, false); + } + + @Test + public void testVectorConversionMultiUltraSparseUnknown() { + testDataFrameConversionUltraSparse(true, true); + } + @Test + public void testRowConversionMultiUltraSparse() { + testDataFrameConversionUltraSparse(false, false); + } + + @Test + public void testRowConversionMultiUltraSparseUnknown() { + testDataFrameConversionUltraSparse(false, true); + } + private void testDataFrameConversion(boolean vector, int cols, boolean dense, boolean unknownDims) { boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform; @@ -212,6 +233,48 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase } } + private void testDataFrameConversionUltraSparse(boolean vector, boolean unknownDims) { + boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; + RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform; + + try + { + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; + + //generate input data and setup metadata + double[][] A = getRandomMatrix(rows1, 1, -10, 10, 0.7, 2373); + MatrixBlock mbA0 = DataConverter.convertToMatrixBlock(A); + MatrixBlock mbA = LibMatrixReorg.diag(mbA0, new MatrixBlock(rows1, rows1, true)); + + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, rows1, blksz, blksz, mbA.getNonZeros()); + MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1); + + //get binary block input rdd + JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz); + + //matrix - dataframe - matrix conversion + Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(spark, in, mc1, vector); + JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector); + + //get output matrix block + MatrixBlock mbB0 = SparkExecutionContext.toMatrixBlock(out, rows1, rows1, blksz, blksz, -1); + MatrixBlock mbB = LibMatrixReorg.diag(mbB0, new MatrixBlock(rows1, 1, false)); + + //compare matrix blocks + double[][] B = DataConverter.convertToDoubleMatrix(mbB); + TestUtils.compareMatrices(A, B, rows1, 1, eps); + } + catch( Exception ex ) { + throw new RuntimeException(ex); + } + finally { + DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig; + DMLScript.rtplatform = oldPlatform; + } + } + @AfterClass public static void tearDownClass() { // stop underlying spark context to allow single jvm tests (otherwise the
