Repository: systemml Updated Branches: refs/heads/master 6b8084b97 -> fc8404704
[SYSTEMML-2064] Performance sparse/ultra-sparse rand data generation This patch makes a number of improvements for performance and memory-efficiency of sparse and ultra-sparse rand data generation. This includes (1) avoid materializing the nnz per block for CP/SP/MR, (2) preallocate sparse rows once if accessed row-wise, (3) avoid repeated allocation of random number generators per block, and (4) multi-threaded maintenance of output nnz. On scenarios of generating 10 random matrices (of the size described below), this patch improved performance as follows: a) 1M x 1M, sp=1e-4: 30.7s -> 11.3s b) 10M x 10M, sp=1e-6: 379.5s -> 125.2s (still with 67s GC) Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2af9c668 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2af9c668 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2af9c668 Branch: refs/heads/master Commit: 2af9c668079bf1f339f9705001a4853f7ee445a3 Parents: 6b8084b Author: Matthias Boehm <[email protected]> Authored: Mon Jan 8 19:34:10 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Mon Jan 8 19:34:10 2018 -0800 ---------------------------------------------------------------------- .../runtime/compress/CompressedMatrixBlock.java | 7 +- .../instructions/spark/RandSPInstruction.java | 38 ++--- .../apache/sysml/runtime/matrix/DataGenMR.java | 10 +- .../runtime/matrix/data/LibMatrixDatagen.java | 158 ++++++++----------- .../sysml/runtime/matrix/data/MatrixBlock.java | 21 +-- .../runtime/matrix/mapred/DataGenMapper.java | 10 +- .../sysml/runtime/util/NormalPRNGenerator.java | 14 +- .../apache/sysml/runtime/util/PRNGenerator.java | 12 +- .../sysml/runtime/util/PoissonPRNGenerator.java | 3 +- .../sysml/runtime/util/UniformPRNGenerator.java | 14 +- 10 files changed, 111 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 11d2602..52ed2b4 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -36,7 +36,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -2288,15 +2287,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } @Override - public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - LongStream nnzInBlock, Well1024a bigrand, long bSeed) + public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed) throws DMLRuntimeException { throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); } @Override - public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - LongStream nnzInBlock, Well1024a bigrand, long bSeed, int k) + public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k) throws DMLRuntimeException { throw new RuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); } http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index 71bc427..d6d177f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -24,9 +24,7 @@ import java.io.PrintWriter; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; -import java.util.PrimitiveIterator; import java.util.Random; -import java.util.stream.LongStream; import org.apache.commons.math3.distribution.PoissonDistribution; import org.apache.commons.math3.random.Well1024a; @@ -275,10 +273,8 @@ public class RandSPInstruction extends UnarySPInstruction { } //step 3: seed generation - JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null; + JavaPairRDD<MatrixIndexes, Long> seedsRDD = null; Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed); - LongStream nnz = LibMatrixDatagen.computeNNZperBlock(lrows, lcols, rowsInBlock, colsInBlock, sparsity); - PrimitiveIterator.OfLong nnzIter = nnz.iterator(); double totalSize = OptimizerUtils.estimatePartitionedSizeExactSparsity( lrows, lcols, rowsInBlock, colsInBlock, sparsity); //overestimate for on disk, ensures hdfs block per partition double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize(); @@ -288,14 +284,13 @@ public class RandSPInstruction extends UnarySPInstruction { //a) in-memory seed rdd construction if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD ) { - ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> seeds = - new ArrayList<>(); + ArrayList<Tuple2<MatrixIndexes, Long>> seeds = new ArrayList<>(); for( long i=0; i<numBlocks; i++ ) { long r = 1 + i/numColBlocks; long c = 1 + i%numColBlocks; MatrixIndexes indx = new MatrixIndexes(r, c); Long seedForBlock = bigrand.nextLong(); - seeds.add(new Tuple2<>(indx, new Tuple2<>(seedForBlock, nnzIter.nextLong()))); + seeds.add(new Tuple2<>(indx, seedForBlock)); } //for load balancing: degree of parallelism such that ~128MB per partition @@ -320,8 +315,6 @@ public class RandSPInstruction extends UnarySPInstruction { sb.append(1 + i%numColBlocks); sb.append(','); sb.append(bigrand.nextLong()); - sb.append(','); - sb.append(nnzIter.nextLong()); pw.println(sb.toString()); sb.setLength(0); } @@ -643,19 +636,17 @@ public class RandSPInstruction extends UnarySPInstruction { } } - private static class ExtractSeedTuple implements PairFunction<String, MatrixIndexes, Tuple2<Long,Long>> { + private static class ExtractSeedTuple implements PairFunction<String, MatrixIndexes, Long> { private static final long serialVersionUID = 3973794676854157101L; @Override - public Tuple2<MatrixIndexes, Tuple2<Long, Long>> call(String arg) + public Tuple2<MatrixIndexes, Long> call(String arg) throws Exception { String[] parts = IOUtilFunctions.split(arg, ","); MatrixIndexes ix = new MatrixIndexes( - Long.parseLong(parts[0]), Long.parseLong(parts[1])); - Tuple2<Long,Long> seed = new Tuple2<>( - Long.parseLong(parts[2]), Long.parseLong(parts[3])); - return new Tuple2<>(ix,seed); + Long.parseLong(parts[0]), Long.parseLong(parts[1])); + return new Tuple2<>(ix,Long.parseLong(parts[2])); } } @@ -668,7 +659,7 @@ public class RandSPInstruction extends UnarySPInstruction { } } - private static class GenerateRandomBlock implements PairFunction<Tuple2<MatrixIndexes, Tuple2<Long, Long> >, MatrixIndexes, MatrixBlock> + private static class GenerateRandomBlock implements PairFunction<Tuple2<MatrixIndexes, Long>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = 1616346120426470173L; @@ -695,7 +686,7 @@ public class RandSPInstruction extends UnarySPInstruction { } @Override - public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Tuple2<Long, Long>> kv) + public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Long> kv) throws Exception { //compute local block size: @@ -704,14 +695,13 @@ public class RandSPInstruction extends UnarySPInstruction { long blockColIndex = ix.getColumnIndex(); int lrlen = UtilFunctions.computeBlockSize(_rlen, blockRowIndex, _brlen); int lclen = UtilFunctions.computeBlockSize(_clen, blockColIndex, _bclen); - long seed = kv._2._1; - long blockNNZ = kv._2._2; + long seed = kv._2; MatrixBlock blk = new MatrixBlock(); - RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator( - _pdf, lrlen, lclen, lrlen, lclen, - _sparsity, _min, _max, _pdfParams ); - blk.randOperationsInPlace(rgen, LongStream.of(blockNNZ), null, seed); + RandomMatrixGenerator rgen = LibMatrixDatagen + .createRandomMatrixGenerator(_pdf, lrlen, lclen, + lrlen, lclen,_sparsity, _min, _max, _pdfParams); + blk.randOperationsInPlace(rgen, null, seed); return new Tuple2<>(kv._1, blk); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java index 3040dae..f95dbe7 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java @@ -22,8 +22,6 @@ package org.apache.sysml.runtime.matrix; import java.io.PrintWriter; import java.util.HashSet; -import java.util.PrimitiveIterator; -import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -156,14 +154,10 @@ public class DataGenMR //seed generation Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(randInst.getSeed()); - LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], randInst.getSparsity()); - PrimitiveIterator.OfLong nnzIter = nnz.iterator(); for(long r = 0; r < rlens[i]; r += brlens[i]) { long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); - for(long c = 0; c < clens[i]; c += bclens[i]) - { + for(long c = 0; c < clens[i]; c += bclens[i]) { long curBlockColSize = Math.min(bclens[i], (clens[i] - c)); - sb.append((r / brlens[i]) + 1); sb.append(','); sb.append((c / bclens[i]) + 1); @@ -172,8 +166,6 @@ public class DataGenMR sb.append(','); sb.append(curBlockColSize); sb.append(','); - sb.append(nnzIter.nextLong()); - sb.append(','); sb.append(bigrand.nextLong()); pw.println(sb.toString()); sb.setLength(0); http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java index 9dabf2b..21a6b6e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java @@ -95,6 +95,7 @@ public class LibMatrixDatagen return bigrand; } + @Deprecated public static LongStream computeNNZperBlock(long nrow, long ncol, int brlen, int bclen, double sparsity) throws DMLRuntimeException { @@ -178,16 +179,15 @@ public class LibMatrixDatagen * * @param out output matrix block * @param rgen random matrix generator - * @param nnzInBlocks number of non-zeros in blocks * @param bigrand Well1024a pseudo-random number generator * @param bSeed seed for random generator * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, LongStream nnzInBlocks, + public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed ) throws DMLRuntimeException { - boolean invokedFromCP = (bigrand!=null || nnzInBlocks==null); + boolean invokedFromCP = (bigrand != null); int rows = rgen._rows; int cols = rgen._cols; int rpb = rgen._rowsPerBlock; @@ -219,30 +219,22 @@ public class LibMatrixDatagen } } - // collect nnz stream for multiple consumptions - long[] lnnzInBlocks = nnzInBlocks.toArray(); - // Determine the sparsity of output matrix // if invoked from CP: estimated NNZ is for entire matrix (nnz=0, if 0 initialized) // if invoked from MR: estimated NNZ is for one block - final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 : - (long)(sparsity * rows * cols)) : lnnzInBlocks[0]); + final long estnnz = (long) Math.ceil((min==0.0 && max==0.0) ? 0 : sparsity*rows*cols); boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, cols, estnnz ); - out.reset(rows, cols, lsparse); + out.reset(rows, cols, lsparse, estnnz); - // Allocate memory - //note: individual sparse rows are allocated on demand, - //for consistency with memory estimates and prevent OOMs. - if( out.sparse ) - out.allocateSparseRowsBlock(); - else - out.allocateDenseBlock(); + // allocate memory, incl sparse row allocation if safe + out.allocateBlock(); + //prepare rand internal parameters int nrb = (int) Math.ceil((double)rows/rpb); int ncb = (int) Math.ceil((double)cols/cpb); long[] seeds = invokedFromCP ? generateSeedsForCP(bigrand, nrb, ncb) : null; - genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, lnnzInBlocks, bSeed, seeds); + genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, bSeed, seeds); out.recomputeNonZeros(); } @@ -270,9 +262,8 @@ public class LibMatrixDatagen * @param k ? * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, LongStream nnzInBlocks, - Well1024a bigrand, long bSeed, int k ) - throws DMLRuntimeException + public static void generateRandomMatrix( MatrixBlock out, RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k ) + throws DMLRuntimeException { int rows = rgen._rows; int cols = rgen._cols; @@ -282,7 +273,7 @@ public class LibMatrixDatagen //sanity check valid dimensions and sparsity checkMatrixDimensionsAndSparsity(rows, cols, sparsity); - + /* * Setup min and max for distributions other than "uniform". Min and Max * are set up in such a way that the usual logic of @@ -300,7 +291,7 @@ public class LibMatrixDatagen //fallback to sequential if single rowblock or too few cells or if MatrixBlock is not thread safe if( k<=1 || (rows <= rpb && lsparse) || (long)rows*cols < PAR_NUMCELL_THRESHOLD || !MatrixBlock.isThreadSafe(lsparse) ) { - generateRandomMatrix(out, rgen, nnzInBlocks, bigrand, bSeed); + generateRandomMatrix(out, rgen, bigrand, bSeed); return; } @@ -316,15 +307,10 @@ public class LibMatrixDatagen } } - //allocate memory - //note: individual sparse rows are allocated on demand, - //for consistency with memory estimates and prevent OOMs. - out.reset(rows, cols, lsparse); - if( out.sparse ) - out.allocateSparseRowsBlock(); - else - out.allocateDenseBlock(); - + // allocate memory, incl sparse row allocation if safe + out.reset(rows, cols, lsparse, estnnz); + out.allocateBlock(); + int nrb = (int) Math.ceil((double)rows/rpb); int ncb = (int) Math.ceil((double)cols/cpb); @@ -335,12 +321,8 @@ public class LibMatrixDatagen //generate seeds independent of parallelizations long[] seeds = generateSeedsForCP(bigrand, nrb, ncb); - - // collect nnz stream for multiple consumptions - long[] lnnzInBlocks = nnzInBlocks.toArray(); - - try - { + long nnz = 0; + try { ExecutorService pool = Executors.newFixedThreadPool(k); ArrayList<RandTask> tasks = new ArrayList<>(); int blklen = ((int)(Math.ceil((double)parnb/k))); @@ -350,21 +332,19 @@ public class LibMatrixDatagen int cl = parcol ? i*blklen : 0; int cu = parcol ? Math.min((i+1)*blklen, parnb) : ncb; long[] lseeds = sliceSeedsForCP(seeds, rl, ru, cl, cu, nrb, ncb); - tasks.add(new RandTask(rl, ru, cl, cu, out, - rgen, lnnzInBlocks, bSeed, lseeds) ); + tasks.add(new RandTask(rl, ru, cl, cu, out, rgen, bSeed, lseeds) ); } - List<Future<Object>> ret = pool.invokeAll(tasks); + //execute, handle errors, and aggregate nnz + List<Future<Long>> ret = pool.invokeAll(tasks); pool.shutdown(); - - //exception propagation in case not all tasks successful - for(Future<Object> rc : ret) - rc.get(); + for(Future<Long> rc : ret) + nnz += rc.get(); } catch (Exception e) { throw new DMLRuntimeException(e); - } + } - out.recomputeNonZeros(); + out.setNonZeros(nnz); } /** @@ -409,8 +389,6 @@ public class LibMatrixDatagen out.recomputeNonZeros(); } - - /** * Generates a sample of size <code>size</code> from a range of values [1,range]. * <code>replace</code> defines if sampling is done with or without replacement. @@ -485,7 +463,7 @@ public class LibMatrixDatagen return lseeds; } - private static void genRandomNumbers(boolean invokedFromCP, int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long[] nnzInBlocks, long bSeed, long[] seeds) + private static void genRandomNumbers(boolean invokedFromCP, int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long bSeed, long[] seeds) throws DMLRuntimeException { int rows = rgen._rows; @@ -493,42 +471,43 @@ public class LibMatrixDatagen int rpb = rgen._rowsPerBlock; int cpb = rgen._colsPerBlock; double sparsity = rgen._sparsity; - PRNGenerator valuePRNG = rgen._valuePRNG; double min = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._min : 0; double max = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? rgen._max : 1; double range = max - min; int clen = out.clen; - int estimatedNNzsPerRow = out.estimatedNNzsPerRow; + int estnnzRow = (int)(sparsity * cols); int nrb = (int) Math.ceil((double)rows/rpb); int ncb = (int) Math.ceil((double)cols/cpb); - int blockID = rl*ncb; //used for sparse int counter = 0; // Setup Pseudo Random Number Generator for cell values based on 'pdf'. + PRNGenerator valuePRNG = rgen._valuePRNG; if (valuePRNG == null) { switch (rgen._pdf) { - case UNIFORM: - valuePRNG = new UniformPRNGenerator(); - break; - case NORMAL: - valuePRNG = new NormalPRNGenerator(); - break; - case POISSON: - valuePRNG = new PoissonPRNGenerator(); - break; - default: - throw new DMLRuntimeException("Unsupported distribution function for Rand: " + rgen._pdf); + case UNIFORM: valuePRNG = new UniformPRNGenerator(); break; + case NORMAL: valuePRNG = new NormalPRNGenerator(); break; + case POISSON: valuePRNG = new PoissonPRNGenerator(); break; + default: + throw new DMLRuntimeException("Unsupported distribution function for Rand: " + rgen._pdf); } } + //preallocate prng for non-zero entries + UniformPRNGenerator nnzPRNG = new UniformPRNGenerator(0); + + //preallocate sparse rows if safe + if( out.sparse && estnnzRow > 0 && cl==0 && cu==ncb ) + for( int i=rl*rpb; i<Math.min(ru*rpb, rows); i++ ) + out.sparseBlock.allocate(i, estnnzRow); + // loop through row-block indices for(int rbi = rl; rbi < ru; rbi++) { int blockrows = (rbi == nrb-1 ? (rows-rbi*rpb) : rpb); int rowoffset = rbi*rpb; // loop through column-block indices - for(int cbj = cl; cbj < cu; cbj++, blockID++) { + for(int cbj = cl; cbj < cu; cbj++) { int blockcols = (cbj == ncb-1 ? (cols-cbj*cpb) : cpb); int coloffset = cbj*cpb; @@ -539,34 +518,29 @@ public class LibMatrixDatagen // Initialize the PRNGenerator for determining cells that contain a non-zero value // Note that, "pdf" parameter applies only to cell values and the individual cells // are always selected uniformly at random. - UniformPRNGenerator nnzPRNG = new UniformPRNGenerator(seed); + nnzPRNG.setSeed(seed); // block-level sparsity, which may differ from overall sparsity in the matrix. // (e.g., border blocks may fall under skinny matrix turn point, in CP this would be // irrelevant but we need to ensure consistency with MR) - boolean localSparse = MatrixBlock.evalSparseFormatInMemory(blockrows, blockcols, nnzInBlocks[blockID] ); //(long)(sparsity*blockrows*blockcols)); + boolean localSparse = MatrixBlock.evalSparseFormatInMemory( + blockrows, blockcols, (long)(sparsity*blockrows*blockcols)); if ( localSparse ) { SparseBlock c = out.sparseBlock; - - int idx = 0; // takes values in range [1, brlen*bclen] (both ends including) - int ridx=0, cidx=0; // idx translates into (ridx, cidx) entry within the block - int skip = -1; - double p = sparsity; - // Prob [k-1 zeros before a nonzero] = Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1), where p=sparsity - double log1mp = Math.log(1-p); + double log1mp = Math.log(1-sparsity); + int idx = 0; // takes values in range [1, brlen*bclen] (both ends including) long blocksize = blockrows*blockcols; while(idx < blocksize) { - skip = (int) Math.ceil( Math.log(nnzPRNG.nextDouble())/log1mp )-1; - idx = idx+skip+1; - if ( idx > blocksize) - break; + //compute skip to next index + idx = idx + (int) Math.ceil(Math.log(nnzPRNG.nextDouble())/log1mp); + if ( idx > blocksize) break; // translate idx into (r,c) within the block - ridx = (idx-1)/blockcols; - cidx = (idx-1)%blockcols; + int rix = (idx-1)/blockcols; + int cix = (idx-1)%blockcols; double val = min + (range * valuePRNG.nextDouble()); - c.allocate(rowoffset+ridx, estimatedNNzsPerRow, clen); - c.append(rowoffset+ridx, coloffset+cidx, val); + c.allocate(rowoffset+rix, estnnzRow, clen); + c.append(rowoffset+rix, coloffset+cix, val); } } else { @@ -594,7 +568,7 @@ public class LibMatrixDatagen for(int jj=0; jj < blockcols; jj++) { if(nnzPRNG.nextDouble() <= sparsity) { double val = min + (range * valuePRNG.nextDouble()); - c.allocate(ii+rowoffset, estimatedNNzsPerRow, clen); + c.allocate(ii+rowoffset, estnnzRow, clen); c.append(ii+rowoffset, jj+coloffset, val); } } @@ -639,7 +613,7 @@ public class LibMatrixDatagen return val; } - private static class RandTask implements Callable<Object> + private static class RandTask implements Callable<Long> { private int _rl = -1; private int _ru = -1; @@ -647,11 +621,10 @@ public class LibMatrixDatagen private int _cu = -1; private MatrixBlock _out = null; private RandomMatrixGenerator _rgen = new RandomMatrixGenerator(); - private long[] _nnzInBlocks = null; private long _bSeed = 0; private long[] _seeds = null; - public RandTask(int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long[] nnzInBlocks, long bSeed, long[] seeds) + public RandTask(int rl, int ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long bSeed, long[] seeds) throws DMLRuntimeException { _rl = rl; @@ -660,16 +633,19 @@ public class LibMatrixDatagen _cu = cu; _out = out; _rgen.init(rgen._pdf, rgen._rows, rgen._cols, rgen._rowsPerBlock, rgen._colsPerBlock, rgen._sparsity, rgen._min, rgen._max, rgen._mean); - _nnzInBlocks = nnzInBlocks; _bSeed = bSeed; _seeds = seeds; } - @Override - public Object call() throws Exception - { - genRandomNumbers(true, _rl, _ru, _cl, _cu, _out, _rgen, _nnzInBlocks, _bSeed, _seeds); - return null; + @Override + public Long call() throws Exception { + //execute rand operations (with block indexes) + genRandomNumbers(true, _rl, _ru, _cl, _cu, _out, _rgen, _bSeed, _seeds); + + //thread-local maintenance of non-zero values + int rpb =_rgen._rowsPerBlock, cpb = _rgen._colsPerBlock; + return _out.recomputeNonZeros(_rl*rpb, Math.min(_ru*rpb,_rgen._rows)-1, + _cl*cpb, Math.min(_cu*cpb, _rgen._cols)-1); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index c3575ed..96a2041 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.stream.LongStream; import org.apache.commons.math3.random.Well1024a; import org.apache.hadoop.io.DataInputBuffer; @@ -5315,19 +5314,16 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { MatrixBlock out = new MatrixBlock(); Well1024a bigrand = null; - LongStream nnzInBlock = null; //setup seeds and nnz per block - if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, rgen._max, rgen._sparsity, rgen._pdf) ){ + if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, rgen._max, rgen._sparsity, rgen._pdf) ) bigrand = LibMatrixDatagen.setupSeedsForRand(seed); - nnzInBlock = LibMatrixDatagen.computeNNZperBlock(rgen._rows, rgen._cols, rgen._rowsPerBlock, rgen._colsPerBlock, rgen._sparsity); - } //generate rand data if (k > 1) - out.randOperationsInPlace(rgen, nnzInBlock, bigrand, -1, k); + out.randOperationsInPlace(rgen, bigrand, -1, k); else - out.randOperationsInPlace(rgen, nnzInBlock, bigrand, -1); + out.randOperationsInPlace(rgen, bigrand, -1); return out; } @@ -5353,12 +5349,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @return matrix block * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public MatrixBlock randOperationsInPlace( - RandomMatrixGenerator rgen, LongStream nnzInBlock, - Well1024a bigrand, long bSeed ) + public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed ) throws DMLRuntimeException { - LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, bigrand, bSeed ); + LibMatrixDatagen.generateRandomMatrix(this, rgen, bigrand, bSeed); return this; } @@ -5385,11 +5379,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @throws DMLRuntimeException if DMLRuntimeException occurs */ public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, - LongStream nnzInBlock, Well1024a bigrand, long bSeed, int k) + Well1024a bigrand, long bSeed, int k) throws DMLRuntimeException { - LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, - bigrand, bSeed, k ); + LibMatrixDatagen.generateRandomMatrix(this, rgen, bigrand, bSeed, k); return this; } http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java index 30616a2..48ebc53 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java @@ -21,7 +21,6 @@ package org.apache.sysml.runtime.matrix.mapred; import java.io.IOException; -import java.util.stream.LongStream; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -69,8 +68,7 @@ implements Mapper<Writable, Writable, Writable, Writable> long blockColNumber = Long.parseLong(params[1]); int blockRowSize = Integer.parseInt(params[2]); int blockColSize = Integer.parseInt(params[3]); - long blockNNZ = Integer.parseInt(params[4]); - long seed=Long.parseLong(params[5]); + long seed=Long.parseLong(params[4]); double minValue = randInst.getMinValue(); double maxValue = randInst.getMaxValue(); double sparsity = randInst.getSparsity(); @@ -81,10 +79,10 @@ implements Mapper<Writable, Writable, Writable, Writable> indexes[i].setIndexes(blockRowNumber, blockColNumber); RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator( - pdf, blockRowSize, blockColSize, blockRowSize, blockColSize, - sparsity, minValue, maxValue, randInst.getPdfParams() ); + pdf, blockRowSize, blockColSize, blockRowSize, blockColSize, + sparsity, minValue, maxValue, randInst.getPdfParams() ); - block[i].randOperationsInPlace(rgen, LongStream.of(blockNNZ), null, seed); + block[i].randOperationsInPlace(rgen, null, seed); } catch(DMLRuntimeException e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java b/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java index 69af6a6..50ba726 100644 --- a/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java +++ b/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java @@ -30,22 +30,20 @@ import java.util.Random; public class NormalPRNGenerator extends PRNGenerator { - private Random r; + private final Random rnorm; private RandNPair pair; private boolean flag = false; // we use pair.N1 if flag=false, and pair.N2 otherwise public NormalPRNGenerator() { - super(); + rnorm = new Random(); } @Override - public void setSeed(long sd) { - //seed = s; - seed = sd; - r = new Random(seed); + public void setSeed(long seed) { + rnorm.setSeed(seed); pair = new RandNPair(); flag = false; - pair.compute(r); + pair.compute(rnorm); } @Override @@ -56,7 +54,7 @@ public class NormalPRNGenerator extends PRNGenerator } else { d = pair.getSecond(); - pair.compute(r); + pair.compute(rnorm); } flag = !flag; return d; http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java b/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java index 5413dab..bd8e71e 100644 --- a/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java +++ b/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java @@ -20,17 +20,9 @@ package org.apache.sysml.runtime.util; -public abstract class PRNGenerator -{ +public abstract class PRNGenerator { - - long seed = -1; - public abstract void setSeed(long sd); - - public PRNGenerator() { - seed = -1; - } - public abstract double nextDouble() ; + public abstract double nextDouble(); } http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java b/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java index cf24d07..a10e1fa 100644 --- a/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java +++ b/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java @@ -34,7 +34,8 @@ public class PoissonPRNGenerator extends PRNGenerator { private PoissonDistribution _pdist = null; private double _mean = Double.NaN; - + private long seed; + public PoissonPRNGenerator() { // default mean and default seed super(); http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java b/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java index 9e819be..a51a6a9 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java +++ b/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java @@ -24,21 +24,19 @@ import java.util.Random; public class UniformPRNGenerator extends PRNGenerator { - private Random runif = null; + private final Random runif; public UniformPRNGenerator() { - super(); + runif = new Random(); } - public UniformPRNGenerator(long sd) { - super(); - setSeed(sd); + public UniformPRNGenerator(long seed) { + runif = new Random(seed); } @Override - public void setSeed(long sd) { - seed = sd; - runif = new Random(seed); + public void setSeed(long seed) { + runif.setSeed(seed); } @Override
