Repository: systemml
Updated Branches:
  refs/heads/master 6b8084b97 -> fc8404704


[SYSTEMML-2064] Performance sparse/ultra-sparse rand data generation

This patch makes a number of improvements for performance and
memory-efficiency of sparse and ultra-sparse rand data generation. This
includes (1) avoid materializing the nnz per block for CP/SP/MR, (2)
preallocate sparse rows once if accessed row-wise, (3) avoid repeated
allocation of random number generators per block, and (4) multi-threaded
maintenance of output nnz.

On scenarios of generating 10 random matrices (of the size described
below), this patch improved performance as follows:
a) 1M x 1M, sp=1e-4:   30.7s -> 11.3s
b) 10M x 10M, sp=1e-6: 379.5s -> 125.2s (still with 67s GC)


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2af9c668
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2af9c668
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2af9c668

Branch: refs/heads/master
Commit: 2af9c668079bf1f339f9705001a4853f7ee445a3
Parents: 6b8084b
Author: Matthias Boehm <[email protected]>
Authored: Mon Jan 8 19:34:10 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Mon Jan 8 19:34:10 2018 -0800

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java |   7 +-
 .../instructions/spark/RandSPInstruction.java   |  38 ++---
 .../apache/sysml/runtime/matrix/DataGenMR.java  |  10 +-
 .../runtime/matrix/data/LibMatrixDatagen.java   | 158 ++++++++-----------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  21 +--
 .../runtime/matrix/mapred/DataGenMapper.java    |  10 +-
 .../sysml/runtime/util/NormalPRNGenerator.java  |  14 +-
 .../apache/sysml/runtime/util/PRNGenerator.java |  12 +-
 .../sysml/runtime/util/PoissonPRNGenerator.java |   3 +-
 .../sysml/runtime/util/UniformPRNGenerator.java |  14 +-
 10 files changed, 111 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 11d2602..52ed2b4 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -2288,15 +2287,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        }
 
        @Override
-       public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen,
-                       LongStream nnzInBlock, Well1024a bigrand, long bSeed)
+       public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, 
Well1024a bigrand, long bSeed)
                        throws DMLRuntimeException {
                throw new RuntimeException("CompressedMatrixBlock: 
randOperationsInPlace not supported.");
        }
 
        @Override
-       public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen,
-                       LongStream nnzInBlock, Well1024a bigrand, long bSeed, 
int k)
+       public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, 
Well1024a bigrand, long bSeed, int k)
                        throws DMLRuntimeException {
                throw new RuntimeException("CompressedMatrixBlock: 
randOperationsInPlace not supported.");
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 71bc427..d6d177f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -24,9 +24,7 @@ import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.PrimitiveIterator;
 import java.util.Random;
-import java.util.stream.LongStream;
 
 import org.apache.commons.math3.distribution.PoissonDistribution;
 import org.apache.commons.math3.random.Well1024a;
@@ -275,10 +273,8 @@ public class RandSPInstruction extends UnarySPInstruction {
                }
                
                //step 3: seed generation 
-               JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
+               JavaPairRDD<MatrixIndexes, Long> seedsRDD = null;
                Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
-               LongStream nnz = LibMatrixDatagen.computeNNZperBlock(lrows, 
lcols, rowsInBlock, colsInBlock, sparsity);
-               PrimitiveIterator.OfLong nnzIter = nnz.iterator();
                double totalSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity( lrows, lcols, rowsInBlock, 
                        colsInBlock, sparsity); //overestimate for on disk, 
ensures hdfs block per partition
                double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
@@ -288,14 +284,13 @@ public class RandSPInstruction extends UnarySPInstruction 
{
                //a) in-memory seed rdd construction 
                if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
                {
-                       ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> 
seeds = 
-                                       new ArrayList<>();
+                       ArrayList<Tuple2<MatrixIndexes, Long>> seeds = new 
ArrayList<>();
                        for( long i=0; i<numBlocks; i++ ) {
                                long r = 1 + i/numColBlocks;
                                long c = 1 + i%numColBlocks;
                                MatrixIndexes indx = new MatrixIndexes(r, c);
                                Long seedForBlock = bigrand.nextLong();
-                               seeds.add(new Tuple2<>(indx, new 
Tuple2<>(seedForBlock, nnzIter.nextLong())));
+                               seeds.add(new Tuple2<>(indx, seedForBlock));
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
@@ -320,8 +315,6 @@ public class RandSPInstruction extends UnarySPInstruction {
                                        sb.append(1 + i%numColBlocks);
                                        sb.append(',');
                                        sb.append(bigrand.nextLong());
-                                       sb.append(',');
-                                       sb.append(nnzIter.nextLong());
                                        pw.println(sb.toString());
                                        sb.setLength(0);
                                }
@@ -643,19 +636,17 @@ public class RandSPInstruction extends UnarySPInstruction 
{
                }
        }
 
-       private static class ExtractSeedTuple implements PairFunction<String, 
MatrixIndexes, Tuple2<Long,Long>> {
+       private static class ExtractSeedTuple implements PairFunction<String, 
MatrixIndexes, Long> {
                private static final long serialVersionUID = 
3973794676854157101L;
 
                @Override
-               public Tuple2<MatrixIndexes, Tuple2<Long, Long>> call(String 
arg)
+               public Tuple2<MatrixIndexes, Long> call(String arg)
                                throws Exception 
                {
                        String[] parts = IOUtilFunctions.split(arg, ",");
                        MatrixIndexes ix = new MatrixIndexes(
-                                       Long.parseLong(parts[0]), 
Long.parseLong(parts[1]));
-                       Tuple2<Long,Long> seed = new Tuple2<>(
-                                       Long.parseLong(parts[2]), 
Long.parseLong(parts[3]));
-                       return new Tuple2<>(ix,seed);
+                               Long.parseLong(parts[0]), 
Long.parseLong(parts[1]));
+                       return new Tuple2<>(ix,Long.parseLong(parts[2]));
                }
        }
 
@@ -668,7 +659,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                }
        }
 
-       private static class GenerateRandomBlock implements 
PairFunction<Tuple2<MatrixIndexes, Tuple2<Long, Long> >, MatrixIndexes, 
MatrixBlock> 
+       private static class GenerateRandomBlock implements 
PairFunction<Tuple2<MatrixIndexes, Long>, MatrixIndexes, MatrixBlock> 
        {
                private static final long serialVersionUID = 
1616346120426470173L;
                
@@ -695,7 +686,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                }
 
                @Override
-               public Tuple2<MatrixIndexes, MatrixBlock> 
call(Tuple2<MatrixIndexes, Tuple2<Long, Long>> kv) 
+               public Tuple2<MatrixIndexes, MatrixBlock> 
call(Tuple2<MatrixIndexes, Long> kv) 
                        throws Exception 
                {
                        //compute local block size: 
@@ -704,14 +695,13 @@ public class RandSPInstruction extends UnarySPInstruction 
{
                        long blockColIndex = ix.getColumnIndex();
                        int lrlen = UtilFunctions.computeBlockSize(_rlen, 
blockRowIndex, _brlen);
                        int lclen = UtilFunctions.computeBlockSize(_clen, 
blockColIndex, _bclen);
-                       long seed = kv._2._1;
-                       long blockNNZ = kv._2._2;
+                       long seed = kv._2;
                        
                        MatrixBlock blk = new MatrixBlock();
-                       RandomMatrixGenerator rgen = 
LibMatrixDatagen.createRandomMatrixGenerator(
-                                       _pdf, lrlen, lclen, lrlen, lclen,   
-                                       _sparsity, _min, _max, _pdfParams );
-                       blk.randOperationsInPlace(rgen, 
LongStream.of(blockNNZ), null, seed);
+                       RandomMatrixGenerator rgen = LibMatrixDatagen
+                               .createRandomMatrixGenerator(_pdf, lrlen, lclen,
+                                       lrlen, lclen,_sparsity, _min, _max, 
_pdfParams);
+                       blk.randOperationsInPlace(rgen, null, seed);
                        return new Tuple2<>(kv._1, blk);
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index 3040dae..f95dbe7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -22,8 +22,6 @@ package org.apache.sysml.runtime.matrix;
 
 import java.io.PrintWriter;
 import java.util.HashSet;
-import java.util.PrimitiveIterator;
-import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -156,14 +154,10 @@ public class DataGenMR
                                        
                                        //seed generation
                                        Well1024a bigrand = 
LibMatrixDatagen.setupSeedsForRand(randInst.getSeed());
-                                       LongStream nnz = 
LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], 
randInst.getSparsity());
-                                       PrimitiveIterator.OfLong nnzIter = 
nnz.iterator();
                                        for(long r = 0; r < rlens[i]; r += 
brlens[i]) {
                                                long curBlockRowSize = 
Math.min(brlens[i], (rlens[i] - r));
-                                               for(long c = 0; c < clens[i]; c 
+= bclens[i])
-                                               {
+                                               for(long c = 0; c < clens[i]; c 
+= bclens[i]) {
                                                        long curBlockColSize = 
Math.min(bclens[i], (clens[i] - c));
-                                                       
                                                        sb.append((r / 
brlens[i]) + 1);
                                                        sb.append(',');
                                                        sb.append((c / 
bclens[i]) + 1);
@@ -172,8 +166,6 @@ public class DataGenMR
                                                        sb.append(',');
                                                        
sb.append(curBlockColSize);
                                                        sb.append(',');
-                                                       
sb.append(nnzIter.nextLong());
-                                                       sb.append(',');
                                                        
sb.append(bigrand.nextLong());
                                                        
pw.println(sb.toString());
                                                        sb.setLength(0);

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
index 9dabf2b..21a6b6e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
@@ -95,6 +95,7 @@ public class LibMatrixDatagen
                return bigrand;
        }
 
+       @Deprecated
        public static LongStream computeNNZperBlock(long nrow, long ncol, int 
brlen, int bclen, double sparsity) 
                throws DMLRuntimeException 
        {
@@ -178,16 +179,15 @@ public class LibMatrixDatagen
         * 
      * @param out output matrix block
      * @param rgen random matrix generator
-     * @param nnzInBlocks number of non-zeros in blocks
      * @param bigrand Well1024a pseudo-random number generator
      * @param bSeed seed for random generator
      * @throws DMLRuntimeException if DMLRuntimeException occurs
      */
-       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, LongStream nnzInBlocks, 
+       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen,
                                                        Well1024a bigrand, long 
bSeed ) 
                throws DMLRuntimeException
        {
-               boolean invokedFromCP = (bigrand!=null || nnzInBlocks==null);
+               boolean invokedFromCP = (bigrand != null);
                int rows = rgen._rows;
                int cols = rgen._cols;
                int rpb = rgen._rowsPerBlock;
@@ -219,30 +219,22 @@ public class LibMatrixDatagen
                        }
                }
                
-               // collect nnz stream for multiple consumptions
-               long[] lnnzInBlocks = nnzInBlocks.toArray();
-               
                // Determine the sparsity of output matrix
                // if invoked from CP: estimated NNZ is for entire matrix 
(nnz=0, if 0 initialized)
                // if invoked from MR: estimated NNZ is for one block
-               final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 
: 
-                               (long)(sparsity * rows * cols)) : 
lnnzInBlocks[0]);
+               final long estnnz = (long) Math.ceil((min==0.0 && max==0.0) ? 0 
: sparsity*rows*cols);
                boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, 
cols, estnnz );
-               out.reset(rows, cols, lsparse);
+               out.reset(rows, cols, lsparse, estnnz);
                
-               // Allocate memory
-               //note: individual sparse rows are allocated on demand,
-               //for consistency with memory estimates and prevent OOMs.
-               if( out.sparse )
-                       out.allocateSparseRowsBlock();
-               else
-                       out.allocateDenseBlock();
+               // allocate memory, incl sparse row allocation if safe
+               out.allocateBlock();
                
+               //prepare rand internal parameters
                int nrb = (int) Math.ceil((double)rows/rpb);
                int ncb = (int) Math.ceil((double)cols/cpb);
                long[] seeds = invokedFromCP ? generateSeedsForCP(bigrand, nrb, 
ncb) : null;
                
-               genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, 
lnnzInBlocks, bSeed, seeds);
+               genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, 
bSeed, seeds);
                
                out.recomputeNonZeros();
        }
@@ -270,9 +262,8 @@ public class LibMatrixDatagen
      * @param k ?
      * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, LongStream nnzInBlocks, 
-                       Well1024a bigrand, long bSeed, int k ) 
-               throws DMLRuntimeException      
+       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k ) 
+               throws DMLRuntimeException
        {       
                int rows = rgen._rows;
                int cols = rgen._cols;
@@ -282,7 +273,7 @@ public class LibMatrixDatagen
                
                //sanity check valid dimensions and sparsity
                checkMatrixDimensionsAndSparsity(rows, cols, sparsity);
-                               
+               
                /*
                 * Setup min and max for distributions other than "uniform". 
Min and Max
                 * are set up in such a way that the usual logic of
@@ -300,7 +291,7 @@ public class LibMatrixDatagen
                //fallback to sequential if single rowblock or too few cells or 
if MatrixBlock is not thread safe
                if( k<=1 || (rows <= rpb && lsparse) || (long)rows*cols < 
PAR_NUMCELL_THRESHOLD 
                        || !MatrixBlock.isThreadSafe(lsparse) ) {
-                       generateRandomMatrix(out, rgen, nnzInBlocks, bigrand, 
bSeed);
+                       generateRandomMatrix(out, rgen, bigrand, bSeed);
                        return;
                }
 
@@ -316,15 +307,10 @@ public class LibMatrixDatagen
                        }
                }
                
-               //allocate memory
-               //note: individual sparse rows are allocated on demand,
-               //for consistency with memory estimates and prevent OOMs.
-               out.reset(rows, cols, lsparse);
-               if( out.sparse )
-                       out.allocateSparseRowsBlock();
-               else
-                       out.allocateDenseBlock();
-       
+               // allocate memory, incl sparse row allocation if safe
+               out.reset(rows, cols, lsparse, estnnz);
+               out.allocateBlock();
+               
                int nrb = (int) Math.ceil((double)rows/rpb);
                int ncb = (int) Math.ceil((double)cols/cpb);
                
@@ -335,12 +321,8 @@ public class LibMatrixDatagen
                
                //generate seeds independent of parallelizations
                long[] seeds = generateSeedsForCP(bigrand, nrb, ncb);
-               
-               // collect nnz stream for multiple consumptions
-               long[] lnnzInBlocks = nnzInBlocks.toArray();
-               
-               try 
-               {
+               long nnz = 0;
+               try {
                        ExecutorService pool = Executors.newFixedThreadPool(k);
                        ArrayList<RandTask> tasks = new ArrayList<>();
                        int blklen = ((int)(Math.ceil((double)parnb/k)));
@@ -350,21 +332,19 @@ public class LibMatrixDatagen
                                int cl = parcol ? i*blklen : 0; 
                                int cu = parcol ? Math.min((i+1)*blklen, parnb) 
: ncb;
                                long[] lseeds = sliceSeedsForCP(seeds, rl, ru, 
cl, cu, nrb, ncb);
-                               tasks.add(new RandTask(rl, ru, cl, cu, out, 
-                                               rgen, lnnzInBlocks, bSeed, 
lseeds) );
+                               tasks.add(new RandTask(rl, ru, cl, cu, out, 
rgen, bSeed, lseeds) );
                        }
-                       List<Future<Object>> ret = pool.invokeAll(tasks);
+                       //execute, handle errors, and aggregate nnz
+                       List<Future<Long>> ret = pool.invokeAll(tasks);
                        pool.shutdown();
-                       
-                       //exception propagation in case not all tasks successful
-                       for(Future<Object> rc : ret) 
-                               rc.get();
+                       for(Future<Long> rc : ret) 
+                               nnz += rc.get();
                } 
                catch (Exception e) {
                        throw new DMLRuntimeException(e);
-               }       
+               }
                
-               out.recomputeNonZeros();
+               out.setNonZeros(nnz);
        }
        
        /**
@@ -409,8 +389,6 @@ public class LibMatrixDatagen
                out.recomputeNonZeros();
        }
 
-       
-               
        /**
      * Generates a sample of size <code>size</code> from a range of values 
[1,range].
      * <code>replace</code> defines if sampling is done with or without 
replacement.
@@ -485,7 +463,7 @@ public class LibMatrixDatagen
                return lseeds;
        }
 
-       private static void genRandomNumbers(boolean invokedFromCP, int rl, int 
ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long[] 
nnzInBlocks, long bSeed, long[] seeds) 
+       private static void genRandomNumbers(boolean invokedFromCP, int rl, int 
ru, int cl, int cu, MatrixBlock out, RandomMatrixGenerator rgen, long bSeed, 
long[] seeds) 
                throws DMLRuntimeException 
        {
                int rows = rgen._rows;
@@ -493,42 +471,43 @@ public class LibMatrixDatagen
                int rpb = rgen._rowsPerBlock;
                int cpb = rgen._colsPerBlock;
                double sparsity = rgen._sparsity;
-               PRNGenerator valuePRNG = rgen._valuePRNG;
                double min = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? 
rgen._min : 0;
                double max = rgen._pdf == RandomMatrixGenerator.PDF.UNIFORM ? 
rgen._max : 1;
                double range = max - min;
                int clen = out.clen;
-               int estimatedNNzsPerRow = out.estimatedNNzsPerRow;
+               int estnnzRow = (int)(sparsity * cols);
                
                int nrb = (int) Math.ceil((double)rows/rpb);
                int ncb = (int) Math.ceil((double)cols/cpb);
-               int blockID = rl*ncb; //used for sparse
                int counter = 0;
 
                // Setup Pseudo Random Number Generator for cell values based 
on 'pdf'.
+               PRNGenerator valuePRNG = rgen._valuePRNG;
                if (valuePRNG == null) {
                        switch (rgen._pdf) {
-                       case UNIFORM:
-                               valuePRNG = new UniformPRNGenerator();
-                               break;
-                       case NORMAL:
-                               valuePRNG = new NormalPRNGenerator();
-                               break;
-                       case POISSON:
-                               valuePRNG = new PoissonPRNGenerator();
-                               break;
-                       default:
-                               throw new DMLRuntimeException("Unsupported 
distribution function for Rand: " + rgen._pdf);
+                               case UNIFORM: valuePRNG = new 
UniformPRNGenerator(); break;
+                               case NORMAL:  valuePRNG = new 
NormalPRNGenerator(); break;
+                               case POISSON: valuePRNG = new 
PoissonPRNGenerator(); break;
+                               default:
+                                       throw new 
DMLRuntimeException("Unsupported distribution function for Rand: " + rgen._pdf);
                        }
                }
                
+               //preallocate prng for non-zero entries
+               UniformPRNGenerator nnzPRNG = new UniformPRNGenerator(0);
+               
+               //preallocate sparse rows if safe
+               if( out.sparse && estnnzRow > 0 && cl==0 && cu==ncb )
+                       for( int i=rl*rpb; i<Math.min(ru*rpb, rows); i++ )
+                               out.sparseBlock.allocate(i, estnnzRow);
+               
                // loop through row-block indices
                for(int rbi = rl; rbi < ru; rbi++) {
                        int blockrows = (rbi == nrb-1 ? (rows-rbi*rpb) : rpb);
                        int rowoffset = rbi*rpb;
 
                        // loop through column-block indices
-                       for(int cbj = cl; cbj < cu; cbj++, blockID++) {
+                       for(int cbj = cl; cbj < cu; cbj++) {
                                int blockcols = (cbj == ncb-1 ? (cols-cbj*cpb) 
: cpb);
                                int coloffset = cbj*cpb;
                                
@@ -539,34 +518,29 @@ public class LibMatrixDatagen
                                // Initialize the PRNGenerator for determining 
cells that contain a non-zero value
                                // Note that, "pdf" parameter applies only to 
cell values and the individual cells 
                                // are always selected uniformly at random.
-                               UniformPRNGenerator nnzPRNG = new 
UniformPRNGenerator(seed);
+                               nnzPRNG.setSeed(seed);
                                
                                // block-level sparsity, which may differ from 
overall sparsity in the matrix.
                                // (e.g., border blocks may fall under skinny 
matrix turn point, in CP this would be 
                                // irrelevant but we need to ensure consistency 
with MR)
-                               boolean localSparse = 
MatrixBlock.evalSparseFormatInMemory(blockrows, blockcols, nnzInBlocks[blockID] 
); //(long)(sparsity*blockrows*blockcols));  
+                               boolean localSparse = 
MatrixBlock.evalSparseFormatInMemory(
+                                       blockrows, blockcols, 
(long)(sparsity*blockrows*blockcols));
                                if ( localSparse ) {
                                        SparseBlock c = out.sparseBlock;
-                                       
-                                       int idx = 0;  // takes values in range 
[1, brlen*bclen] (both ends including)
-                                       int ridx=0, cidx=0; // idx translates 
into (ridx, cidx) entry within the block
-                                       int skip = -1;
-                                       double p = sparsity;
-                                       
                                        // Prob [k-1 zeros before a nonzero] = 
Prob [k-1 < log(uniform)/log(1-p) < k] = p*(1-p)^(k-1), where p=sparsity
-                                       double log1mp = Math.log(1-p);
+                                       double log1mp = Math.log(1-sparsity);
+                                       int idx = 0;  // takes values in range 
[1, brlen*bclen] (both ends including)
                                        long blocksize = blockrows*blockcols;
                                        while(idx < blocksize) {
-                                               skip = (int) Math.ceil( 
Math.log(nnzPRNG.nextDouble())/log1mp )-1;
-                                               idx = idx+skip+1;
-                                               if ( idx > blocksize)
-                                                       break;
+                                               //compute skip to next index
+                                               idx = idx + (int) 
Math.ceil(Math.log(nnzPRNG.nextDouble())/log1mp);
+                                               if ( idx > blocksize) break;
                                                // translate idx into (r,c) 
within the block
-                                               ridx = (idx-1)/blockcols;
-                                               cidx = (idx-1)%blockcols;
+                                               int rix = (idx-1)/blockcols;
+                                               int cix = (idx-1)%blockcols;
                                                double val = min + (range * 
valuePRNG.nextDouble());
-                                               c.allocate(rowoffset+ridx, 
estimatedNNzsPerRow, clen);
-                                               c.append(rowoffset+ridx, 
coloffset+cidx, val);
+                                               c.allocate(rowoffset+rix, 
estnnzRow, clen);
+                                               c.append(rowoffset+rix, 
coloffset+cix, val);
                                        }
                                }
                                else {
@@ -594,7 +568,7 @@ public class LibMatrixDatagen
                                                                for(int jj=0; 
jj < blockcols; jj++) {
                                                                        
if(nnzPRNG.nextDouble() <= sparsity) {
                                                                                
double val = min + (range * valuePRNG.nextDouble());
-                                                                               
c.allocate(ii+rowoffset, estimatedNNzsPerRow, clen);
+                                                                               
c.allocate(ii+rowoffset, estnnzRow, clen);
                                                                                
c.append(ii+rowoffset, jj+coloffset, val);
                                                                        }
                                                                }
@@ -639,7 +613,7 @@ public class LibMatrixDatagen
         return val;
     }
 
-       private static class RandTask implements Callable<Object> 
+       private static class RandTask implements Callable<Long> 
        {
                private int _rl = -1;
                private int _ru = -1;
@@ -647,11 +621,10 @@ public class LibMatrixDatagen
                private int _cu = -1;
                private MatrixBlock _out = null;
                private RandomMatrixGenerator _rgen = new 
RandomMatrixGenerator();
-               private long[] _nnzInBlocks = null;
                private long _bSeed = 0;
                private long[] _seeds = null;
                
-               public RandTask(int rl, int ru, int cl, int cu, MatrixBlock 
out, RandomMatrixGenerator rgen, long[] nnzInBlocks, long bSeed, long[] seeds) 
+               public RandTask(int rl, int ru, int cl, int cu, MatrixBlock 
out, RandomMatrixGenerator rgen, long bSeed, long[] seeds) 
                        throws DMLRuntimeException 
                {
                        _rl = rl;
@@ -660,16 +633,19 @@ public class LibMatrixDatagen
                        _cu = cu;
                        _out = out;
                        _rgen.init(rgen._pdf, rgen._rows, rgen._cols, 
rgen._rowsPerBlock, rgen._colsPerBlock, rgen._sparsity, rgen._min, rgen._max, 
rgen._mean);
-                       _nnzInBlocks = nnzInBlocks;
                        _bSeed = bSeed;
                        _seeds = seeds;
                }
 
-               @Override               
-               public Object call() throws Exception
-               {
-                       genRandomNumbers(true, _rl, _ru, _cl, _cu, _out, _rgen, 
_nnzInBlocks, _bSeed, _seeds);
-                       return null;
+               @Override
+               public Long call() throws Exception {
+                       //execute rand operations (with block indexes)
+                       genRandomNumbers(true, _rl, _ru, _cl, _cu, _out, _rgen, 
_bSeed, _seeds);
+                       
+                       //thread-local maintenance of non-zero values
+                       int rpb =_rgen._rowsPerBlock, cpb = _rgen._colsPerBlock;
+                       return _out.recomputeNonZeros(_rl*rpb, 
Math.min(_ru*rpb,_rgen._rows)-1,
+                               _cl*cpb, Math.min(_cu*cpb, _rgen._cols)-1);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index c3575ed..96a2041 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.stream.LongStream;
 
 import org.apache.commons.math3.random.Well1024a;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -5315,19 +5314,16 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        {
                MatrixBlock out = new MatrixBlock();
                Well1024a bigrand = null;
-               LongStream nnzInBlock = null;
 
                //setup seeds and nnz per block
-               if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, 
rgen._max, rgen._sparsity, rgen._pdf) ){
+               if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, 
rgen._max, rgen._sparsity, rgen._pdf) )
                        bigrand = LibMatrixDatagen.setupSeedsForRand(seed);
-                       nnzInBlock = 
LibMatrixDatagen.computeNNZperBlock(rgen._rows, rgen._cols, rgen._rowsPerBlock, 
rgen._colsPerBlock, rgen._sparsity);
-               }
                
                //generate rand data
                if (k > 1)
-                       out.randOperationsInPlace(rgen, nnzInBlock, bigrand, 
-1, k);
+                       out.randOperationsInPlace(rgen, bigrand, -1, k);
                else
-                       out.randOperationsInPlace(rgen, nnzInBlock, bigrand, 
-1);
+                       out.randOperationsInPlace(rgen, bigrand, -1);
                
                return out;
        }
@@ -5353,12 +5349,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @return matrix block
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public MatrixBlock randOperationsInPlace(
-                                                               
RandomMatrixGenerator rgen, LongStream nnzInBlock, 
-                                                               Well1024a 
bigrand, long bSeed ) 
+       public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, 
Well1024a bigrand, long bSeed ) 
                throws DMLRuntimeException
        {
-               LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, 
bigrand, bSeed );
+               LibMatrixDatagen.generateRandomMatrix(this, rgen, bigrand, 
bSeed);
                return this;
        }
        
@@ -5385,11 +5379,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, 
-                       LongStream nnzInBlock, Well1024a bigrand, long bSeed, 
int k) 
+                       Well1024a bigrand, long bSeed, int k) 
                throws DMLRuntimeException
        {
-               LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, 
-                               bigrand, bSeed, k );
+               LibMatrixDatagen.generateRandomMatrix(this, rgen, bigrand, 
bSeed, k);
                return this;
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
index 30616a2..48ebc53 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
@@ -21,7 +21,6 @@
 package org.apache.sysml.runtime.matrix.mapred;
 
 import java.io.IOException;
-import java.util.stream.LongStream;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -69,8 +68,7 @@ implements Mapper<Writable, Writable, Writable, Writable>
                                long blockColNumber = Long.parseLong(params[1]);
                                int blockRowSize = Integer.parseInt(params[2]);
                                int blockColSize = Integer.parseInt(params[3]);
-                               long blockNNZ = Integer.parseInt(params[4]);
-                               long seed=Long.parseLong(params[5]);
+                               long seed=Long.parseLong(params[4]);
                                double minValue = randInst.getMinValue();
                                double maxValue = randInst.getMaxValue();
                                double sparsity = randInst.getSparsity();
@@ -81,10 +79,10 @@ implements Mapper<Writable, Writable, Writable, Writable>
                                        indexes[i].setIndexes(blockRowNumber, 
blockColNumber);
                                        
                                        RandomMatrixGenerator rgen = 
LibMatrixDatagen.createRandomMatrixGenerator(
-                                                                               
                                                                pdf, 
blockRowSize, blockColSize, blockRowSize, blockColSize,   
-                                                                               
                                                                sparsity, 
minValue, maxValue, randInst.getPdfParams() );
+                                               pdf, blockRowSize, 
blockColSize, blockRowSize, blockColSize,
+                                               sparsity, minValue, maxValue, 
randInst.getPdfParams() );
 
-                                       block[i].randOperationsInPlace(rgen, 
LongStream.of(blockNNZ), null, seed); 
+                                       block[i].randOperationsInPlace(rgen, 
null, seed); 
                                } 
                                catch(DMLRuntimeException e) {
                                        throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java 
b/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java
index 69af6a6..50ba726 100644
--- a/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java
+++ b/src/main/java/org/apache/sysml/runtime/util/NormalPRNGenerator.java
@@ -30,22 +30,20 @@ import java.util.Random;
 
 public class NormalPRNGenerator extends PRNGenerator
 {
-       private Random r;
+       private final Random rnorm;
        private RandNPair pair;
        private boolean flag = false; // we use pair.N1 if flag=false, and 
pair.N2 otherwise
        
        public NormalPRNGenerator() {
-               super();
+               rnorm = new Random();
        }
        
        @Override
-       public void setSeed(long sd) {
-               //seed = s;
-               seed = sd;
-               r = new Random(seed);
+       public void setSeed(long seed) {
+               rnorm.setSeed(seed);
                pair = new RandNPair();
                flag = false;
-               pair.compute(r);
+               pair.compute(rnorm);
        }
        
        @Override
@@ -56,7 +54,7 @@ public class NormalPRNGenerator extends PRNGenerator
                }
                else {
                        d = pair.getSecond();
-                       pair.compute(r);
+                       pair.compute(rnorm);
                }
                flag = !flag;
                return d;

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java 
b/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java
index 5413dab..bd8e71e 100644
--- a/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java
+++ b/src/main/java/org/apache/sysml/runtime/util/PRNGenerator.java
@@ -20,17 +20,9 @@
 
 package org.apache.sysml.runtime.util;
 
-public abstract class PRNGenerator 
-{
+public abstract class PRNGenerator {
 
-       
-       long seed = -1;
-       
        public abstract void setSeed(long sd);
-       
-       public PRNGenerator() {
-               seed = -1;
-       }
 
-       public abstract double nextDouble() ;
+       public abstract double nextDouble();
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java 
b/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java
index cf24d07..a10e1fa 100644
--- a/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java
+++ b/src/main/java/org/apache/sysml/runtime/util/PoissonPRNGenerator.java
@@ -34,7 +34,8 @@ public class PoissonPRNGenerator extends PRNGenerator
 {
        private PoissonDistribution _pdist = null;
        private double _mean = Double.NaN;
-
+       private long seed;
+       
        public PoissonPRNGenerator() {
                // default mean and default seed
                super();

http://git-wip-us.apache.org/repos/asf/systemml/blob/2af9c668/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java 
b/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java
index 9e819be..a51a6a9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UniformPRNGenerator.java
@@ -24,21 +24,19 @@ import java.util.Random;
 
 public class UniformPRNGenerator extends PRNGenerator {
 
-       private Random runif = null;
+       private final Random runif;
        
        public UniformPRNGenerator() {
-               super();
+               runif = new Random();
        }
        
-       public UniformPRNGenerator(long sd) {
-               super();
-               setSeed(sd);
+       public UniformPRNGenerator(long seed) {
+               runif = new Random(seed);
        }
 
        @Override
-       public void setSeed(long sd) {
-               seed = sd;
-               runif = new Random(seed);
+       public void setSeed(long seed) {
+               runif.setSeed(seed);
        }
 
        @Override

Reply via email to