Repository: incubator-systemml Updated Branches: refs/heads/master 2893e1aed -> 795a94e7b
[SYSTEMML-1423] Fix OOMs ultra-sparse spark/mr rand (stream nnz/block) Our rand operations of all backends materialize the number of non-zeros per block. For ultra-sparse matrices, this can - even for distributed operations - quickly lead to out-of-memory situations at the driver. We now create these nnz per block as a stream that can be lazily consumed. This fixes OOMs when generating ultra-sparse matrices with spark or mr instructions. CP operations still materialize this stream in order to allow repeated access for multi-threaded computation, which is fine because this intermediate memory requirement is already reflected in the hops memory estimate. Furthermore, this patch also fixes an issue with the generation of nnz per block for matrices with nnz < numBlocks. So far, the number of blocks with non-zeros was drawn as a single random number which could create heavily skewed data. We now simply select blocks with non-zeros using a probability of nnz/numblocks. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/977240dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/977240dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/977240dd Branch: refs/heads/master Commit: 977240dd9d905479ee777bb7c4946cfa16fa3137 Parents: 2893e1a Author: Matthias Boehm <mboe...@gmail.com> Authored: Sun Mar 19 18:58:51 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sun Mar 19 18:58:51 2017 -0700 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 5 +- .../instructions/spark/RandSPInstruction.java | 13 +- .../apache/sysml/runtime/matrix/DataGenMR.java | 8 +- .../runtime/matrix/data/LibMatrixDatagen.java | 134 +++++++------------ .../sysml/runtime/matrix/data/MatrixBlock.java | 7 +- .../runtime/matrix/mapred/DataGenMapper.java | 3 +- 6 files changed, 70 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index e345fca..7faed54 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -35,6 +35,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -2076,14 +2077,14 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable @Override public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - long[] nnzInBlock, Well1024a bigrand, long bSeed) + LongStream nnzInBlock, Well1024a bigrand, long bSeed) throws DMLRuntimeException { throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); } @Override public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - long[] nnzInBlock, Well1024a bigrand, long bSeed, int k) + LongStream nnzInBlock, Well1024a bigrand, long bSeed, int k) throws DMLRuntimeException { throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index 1e70526..fe1813f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -24,7 +24,9 @@ import java.io.PrintWriter; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; +import java.util.PrimitiveIterator; import java.util.Random; +import java.util.stream.LongStream; import org.apache.commons.math3.distribution.PoissonDistribution; import org.apache.commons.math3.random.Well1024a; @@ -353,11 +355,12 @@ public class RandSPInstruction extends UnarySPInstruction //step 3: seed generation JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null; Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed); - long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity); + LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity); + PrimitiveIterator.OfLong nnzIter = nnz.iterator(); double totalSize = OptimizerUtils.estimatePartitionedSizeExactSparsity( rows, cols, rowsInBlock, colsInBlock, rows*cols*sparsity); //overestimate for on disk, ensures hdfs block per partition double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize(); - long numBlocks = nnz.length; + long numBlocks = new MatrixCharacteristics(rows, cols, rowsInBlock, colsInBlock).getNumBlocks(); long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock); //a) in-memory seed rdd construction @@ -371,7 +374,7 @@ public class RandSPInstruction extends UnarySPInstruction MatrixIndexes indx = new MatrixIndexes(r, c); Long seedForBlock = bigrand.nextLong(); seeds.add(new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(indx, - new Tuple2<Long, Long>(seedForBlock, nnz[(int)i]))); + new Tuple2<Long, Long>(seedForBlock, nnzIter.nextLong()))); } //for load balancing: degree of parallelism such that ~128MB per partition @@ -398,7 +401,7 @@ public class RandSPInstruction extends UnarySPInstruction sb.append(','); sb.append(bigrand.nextLong()); sb.append(','); - sb.append(nnz[(int)i]); + sb.append(nnzIter.nextLong()); pw.println(sb.toString()); sb.setLength(0); } @@ -795,7 +798,7 @@ public class RandSPInstruction extends UnarySPInstruction _pdf, lrlen, lclen, lrlen, lclen, _sparsity, _min, _max, _pdfParams ); - blk.randOperationsInPlace(rgen, new long[]{blockNNZ}, null, seed); + blk.randOperationsInPlace(rgen, LongStream.of(blockNNZ), null, seed); return new Tuple2<MatrixIndexes, MatrixBlock>(kv._1, blk); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java index 7e3de55..00ae9d3 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java @@ -22,6 +22,8 @@ package org.apache.sysml.runtime.matrix; import java.io.PrintWriter; import java.util.HashSet; +import java.util.PrimitiveIterator; +import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -152,8 +154,8 @@ public class DataGenMR //seed generation Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(randInst.getSeed()); - long[] nnz = LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], randInst.getSparsity()); - int nnzIx = 0; + LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], randInst.getSparsity()); + PrimitiveIterator.OfLong nnzIter = nnz.iterator(); for(long r = 0; r < rlens[i]; r += brlens[i]) { long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); for(long c = 0; c < clens[i]; c += bclens[i]) @@ -168,7 +170,7 @@ public class DataGenMR sb.append(','); sb.append(curBlockColSize); sb.append(','); - sb.append(nnz[nnzIx++]); + sb.append(nnzIter.nextLong()); sb.append(','); sb.append(bigrand.nextLong()); pw.println(sb.toString()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java index fd7a17d..fb62c41 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java @@ -21,13 +21,13 @@ package org.apache.sysml.runtime.matrix.data; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +39,7 @@ import org.apache.sysml.runtime.util.NormalPRNGenerator; import org.apache.sysml.runtime.util.PRNGenerator; import org.apache.sysml.runtime.util.PoissonPRNGenerator; import org.apache.sysml.runtime.util.UniformPRNGenerator; +import org.apache.sysml.runtime.util.UtilFunctions; public class LibMatrixDatagen { @@ -97,7 +98,7 @@ public class LibMatrixDatagen return bigrand; } - public static long[] computeNNZperBlock(long nrow, long ncol, int brlen, int bclen, double sparsity) + public static LongStream computeNNZperBlock(long nrow, long ncol, int brlen, int bclen, double sparsity) throws DMLRuntimeException { long lnumBlocks = (long) (Math.ceil((double)nrow/brlen) * Math.ceil((double)ncol/bclen)); @@ -108,75 +109,32 @@ public class LibMatrixDatagen + "Number of blocks ("+lnumBlocks+") exceeds the maximum integer size. Try to increase the block size."); } - // NOTE: Total #of NNZ is set to the expected value (nrow*ncol*sparsity). - // TODO: Instead of using the expected value, NNZ should be random variable - int numBlocks = (int) lnumBlocks; + int numColBlocks = (int) Math.ceil((double)ncol/bclen); long nnz = (long) Math.ceil (nrow * (ncol*sparsity)); - // Compute block-level NNZ - long[] ret = new long[numBlocks]; - - if ( nnz < numBlocks ) { - // Ultra-sparse matrix - - // generate the number of blocks with at least one non-zero - // = a random number between [1,nnz] + if( nnz < numBlocks ) { + //#1: ultra-sparse random number generation + //nnz per block: 1 with probability P = nnz/numBlocks, 0 with probability 1-P + //(note: this is an unbiased generator that, however, will never generate more than + //one non-zero per block, but it uses weights to account for different block sizes) + double P = (double) nnz / numBlocks; Random runif = new Random(System.nanoTime()); - int numNZBlocks = 1; - if(nnz-1 > 0) - numNZBlocks += runif.nextInt((int)(nnz-1)); // To avoid exception from random.nextInt(0) - - // distribute non-zeros across numNZBlocks - - // compute proportions for each nzblock - // - divide (0,1] interval into numNZBlocks portions of random size - double[] blockNNZproportions = new double[numNZBlocks]; - - runif.setSeed(System.nanoTime()); - for(int i=0; i < numNZBlocks-1; i++) { - blockNNZproportions[i] = runif.nextDouble(); - } - blockNNZproportions[numNZBlocks-1] = 1; - // sort the values in ascending order - Arrays.sort(blockNNZproportions); - - // compute actual number of non zeros per block according to proportions - long actualnnz = 0; - int bid; - runif.setSeed(System.nanoTime()); - for(int i=0; i < numNZBlocks; i++) { - bid = -1; - do { - bid = runif.nextInt(numBlocks); - } while( ret[bid] != 0); - - double prop = (i==0 ? blockNNZproportions[i]: (blockNNZproportions[i] - blockNNZproportions[i-1])); - ret[bid] = (long)Math.floor(prop * nnz); - actualnnz += ret[bid]; - } - - // Code to make sure exact number of non-zeros are generated - while (actualnnz < nnz) { - bid = runif.nextInt(numBlocks); - ret[bid]++; - actualnnz++; - } + return LongStream.range(0, numBlocks).map( i -> { + double lP = P / (brlen*bclen) * + UtilFunctions.computeBlockSize(nrow, 1 + i / numColBlocks, brlen) * + UtilFunctions.computeBlockSize(ncol, 1 + i % numColBlocks, bclen); + return (runif.nextDouble() <= lP) ? 1 : 0; + }); } else { - int bid = 0; - - for(long r = 0; r < nrow; r += brlen) { - long curBlockRowSize = Math.min(brlen, (nrow - r)); - for(long c = 0; c < ncol; c += bclen) - { - long curBlockColSize = Math.min(bclen, (ncol - c)); - ret[bid] = (long) (curBlockRowSize * curBlockColSize * sparsity); - bid++; - } - } + //#2: dense/sparse random number generation + //nnz per block: lrlen * lclen * sparsity (note: this is a biased generator + //that might actually create fewer but never more non zeros than expected) + return LongStream.range(0, numBlocks).map( i -> (long)(sparsity * + UtilFunctions.computeBlockSize(nrow, 1 + i / numColBlocks, brlen) * + UtilFunctions.computeBlockSize(ncol, 1 + i % numColBlocks, bclen))); } - return ret; } public static RandomMatrixGenerator createRandomMatrixGenerator(String pdf, int r, int c, int rpb, int cpb, double sp, double min, double max, String distParams) @@ -224,7 +182,7 @@ public class LibMatrixDatagen * @param bSeed seed for random generator * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, long[] nnzInBlocks, + public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, LongStream nnzInBlocks, Well1024a bigrand, long bSeed ) throws DMLRuntimeException { @@ -247,28 +205,30 @@ public class LibMatrixDatagen double min = rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM) ? rgen._min : 0; double max = rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM) ? rgen._max : 1; - // Determine the sparsity of output matrix - // if invoked from CP: estimated NNZ is for entire matrix (nnz=0, if 0 initialized) - // if invoked from MR: estimated NNZ is for one block - final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 : (long)(sparsity * rows * cols)) - : nnzInBlocks[0]); - boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, cols, estnnz ); - out.reset(rows, cols, lsparse); - // Special case shortcuts for efficiency if ( rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM)) { if ( min == 0.0 && max == 0.0 ) { //all zeros - out.nonZeros = 0; + out.reset(rows, cols, true); return; } - else if( !out.sparse && sparsity==1.0d && (min == max //equal values, dense - || (Double.isNaN(min) && Double.isNaN(max))) ) //min == max == NaN - { - out.reset(out.rlen, out.clen, min); + else if( sparsity==1.0d && (min == max //equal values, dense + || (Double.isNaN(min) && Double.isNaN(max))) ) { //min == max == NaN + out.reset(rows, cols, min); return; } } + // collect nnz stream for multiple consumptions + long[] lnnzInBlocks = nnzInBlocks.toArray(); + + // Determine the sparsity of output matrix + // if invoked from CP: estimated NNZ is for entire matrix (nnz=0, if 0 initialized) + // if invoked from MR: estimated NNZ is for one block + final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 : + (long)(sparsity * rows * cols)) : lnnzInBlocks[0]); + boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, cols, estnnz ); + out.reset(rows, cols, lsparse); + // Allocate memory //note: individual sparse rows are allocated on demand, //for consistency with memory estimates and prevent OOMs. @@ -281,7 +241,7 @@ public class LibMatrixDatagen int ncb = (int) Math.ceil((double)cols/cpb); long[] seeds = invokedFromCP ? generateSeedsForCP(bigrand, nrb, ncb) : null; - genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, nnzInBlocks, bSeed, seeds); + genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, lnnzInBlocks, bSeed, seeds); out.recomputeNonZeros(); } @@ -309,7 +269,7 @@ public class LibMatrixDatagen * @param k ? * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, long[] nnzInBlocks, + public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, LongStream nnzInBlocks, Well1024a bigrand, long bSeed, int k ) throws DMLRuntimeException { @@ -343,16 +303,14 @@ public class LibMatrixDatagen return; } - out.reset(rows, cols, lsparse); - //special case shortcuts for efficiency if ( rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM)) { if ( min == 0.0 && max == 0.0 ) { //all zeros - out.nonZeros = 0; + out.reset(rows, cols, false); return; } - else if( !out.sparse && sparsity==1.0d && min == max ) { //equal values - out.reset(out.rlen, out.clen, min); + else if( sparsity==1.0d && min == max ) { //equal values + out.reset(rows, cols, min); return; } } @@ -360,6 +318,7 @@ public class LibMatrixDatagen //allocate memory //note: individual sparse rows are allocated on demand, //for consistency with memory estimates and prevent OOMs. + out.reset(rows, cols, lsparse); if( out.sparse ) out.allocateSparseRowsBlock(); else @@ -376,6 +335,9 @@ public class LibMatrixDatagen //generate seeds independent of parallelizations long[] seeds = generateSeedsForCP(bigrand, nrb, ncb); + // collect nnz stream for multiple consumptions + long[] lnnzInBlocks = nnzInBlocks.toArray(); + try { ExecutorService pool = Executors.newFixedThreadPool(k); @@ -388,7 +350,7 @@ public class LibMatrixDatagen int cu = parcol ? Math.min((i+1)*blklen, parnb) : ncb; long[] lseeds = sliceSeedsForCP(seeds, rl, ru, cl, cu, nrb, ncb); tasks.add(new RandTask(rl, ru, cl, cu, out, - rgen, nnzInBlocks, bSeed, lseeds) ); + rgen, lnnzInBlocks, bSeed, lseeds) ); } List<Future<Object>> ret = pool.invokeAll(tasks); pool.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 5fc69ce..005f24e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -30,6 +30,7 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.stream.LongStream; import org.apache.commons.math3.random.Well1024a; import org.apache.hadoop.io.DataInputBuffer; @@ -5575,7 +5576,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { MatrixBlock out = new MatrixBlock(); Well1024a bigrand = null; - long[] nnzInBlock = null; + LongStream nnzInBlock = null; //setup seeds and nnz per block if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, rgen._max, rgen._sparsity, rgen._pdf) ){ @@ -5614,7 +5615,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @throws DMLRuntimeException if DMLRuntimeException occurs */ public MatrixBlock randOperationsInPlace( - RandomMatrixGenerator rgen, long[] nnzInBlock, + RandomMatrixGenerator rgen, LongStream nnzInBlock, Well1024a bigrand, long bSeed ) throws DMLRuntimeException { @@ -5645,7 +5646,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @throws DMLRuntimeException if DMLRuntimeException occurs */ public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - long[] nnzInBlock, Well1024a bigrand, long bSeed, int k) + LongStream nnzInBlock, Well1024a bigrand, long bSeed, int k) throws DMLRuntimeException { LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java index 49b2804..30616a2 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.matrix.mapred; import java.io.IOException; +import java.util.stream.LongStream; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -83,7 +84,7 @@ implements Mapper<Writable, Writable, Writable, Writable> pdf, blockRowSize, blockColSize, blockRowSize, blockColSize, sparsity, minValue, maxValue, randInst.getPdfParams() ); - block[i].randOperationsInPlace(rgen, new long[]{blockNNZ}, null, seed); + block[i].randOperationsInPlace(rgen, LongStream.of(blockNNZ), null, seed); } catch(DMLRuntimeException e) { throw new IOException(e);