Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2893e1aed -> 795a94e7b


[SYSTEMML-1423] Fix OOMs ultra-sparse spark/mr rand (stream nnz/block)

Our rand operations of all backends materialize the number of non-zeros
per block. For ultra-sparse matrices, this can - even for distributed
operations - quickly lead to out-of-memory situations at the driver. We
now create these nnz per block as a stream that can be lazily consumed.
This fixes OOMs when generating ultra-sparse matrices with spark or mr
instructions. CP operations still materialize this stream in order to
allow repeated access for multi-threaded computation, which is fine
because this intermediate memory requirement is already reflected in the
hops memory estimate.

Furthermore, this patch also fixes an issue with the generation of nnz
per block for matrices with nnz < numBlocks. So far, the number of
blocks with non-zeros was drawn as a single random number which could
create heavily skewed data. We now simply select blocks with non-zeros
using a probability of nnz/numblocks.  
 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/977240dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/977240dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/977240dd

Branch: refs/heads/master
Commit: 977240dd9d905479ee777bb7c4946cfa16fa3137
Parents: 2893e1a
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sun Mar 19 18:58:51 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sun Mar 19 18:58:51 2017 -0700

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java |   5 +-
 .../instructions/spark/RandSPInstruction.java   |  13 +-
 .../apache/sysml/runtime/matrix/DataGenMR.java  |   8 +-
 .../runtime/matrix/data/LibMatrixDatagen.java   | 134 +++++++------------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   7 +-
 .../runtime/matrix/mapred/DataGenMapper.java    |   3 +-
 6 files changed, 70 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index e345fca..7faed54 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -2076,14 +2077,14 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
        @Override
        public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen,
-                       long[] nnzInBlock, Well1024a bigrand, long bSeed)
+                       LongStream nnzInBlock, Well1024a bigrand, long bSeed)
                        throws DMLRuntimeException {
                throw new RuntimeException("CompressedMatrixBlock: 
randOperationsInPlace not supported.");
        }
 
        @Override
        public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen,
-                       long[] nnzInBlock, Well1024a bigrand, long bSeed, int k)
+                       LongStream nnzInBlock, Well1024a bigrand, long bSeed, 
int k)
                        throws DMLRuntimeException {
                throw new RuntimeException("CompressedMatrixBlock: 
randOperationsInPlace not supported.");
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 1e70526..fe1813f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -24,7 +24,9 @@ import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.PrimitiveIterator;
 import java.util.Random;
+import java.util.stream.LongStream;
 
 import org.apache.commons.math3.distribution.PoissonDistribution;
 import org.apache.commons.math3.random.Well1024a;
@@ -353,11 +355,12 @@ public class RandSPInstruction extends UnarySPInstruction
                //step 3: seed generation 
                JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
                Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
-               long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, 
rowsInBlock, colsInBlock, sparsity);
+               LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rows, 
cols, rowsInBlock, colsInBlock, sparsity);
+               PrimitiveIterator.OfLong nnzIter = nnz.iterator();
                double totalSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity( rows, cols, rowsInBlock, 
                        colsInBlock, rows*cols*sparsity); //overestimate for on 
disk, ensures hdfs block per partition
                double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
-               long numBlocks = nnz.length;
+               long numBlocks = new MatrixCharacteristics(rows, cols, 
rowsInBlock, colsInBlock).getNumBlocks();
                long numColBlocks = 
(long)Math.ceil((double)cols/(double)colsInBlock);
                                
                //a) in-memory seed rdd construction 
@@ -371,7 +374,7 @@ public class RandSPInstruction extends UnarySPInstruction
                                MatrixIndexes indx = new MatrixIndexes(r, c);
                                Long seedForBlock = bigrand.nextLong();
                                seeds.add(new Tuple2<MatrixIndexes, 
Tuple2<Long, Long>>(indx, 
-                                               new Tuple2<Long, 
Long>(seedForBlock, nnz[(int)i])));
+                                               new Tuple2<Long, 
Long>(seedForBlock, nnzIter.nextLong())));
                        }
                        
                        //for load balancing: degree of parallelism such that 
~128MB per partition
@@ -398,7 +401,7 @@ public class RandSPInstruction extends UnarySPInstruction
                                        sb.append(',');
                                        sb.append(bigrand.nextLong());
                                        sb.append(',');
-                                       sb.append(nnz[(int)i]);
+                                       sb.append(nnzIter.nextLong());
                                        pw.println(sb.toString());
                                        sb.setLength(0);
                                }
@@ -795,7 +798,7 @@ public class RandSPInstruction extends UnarySPInstruction
                                        _pdf, lrlen, lclen, lrlen, lclen,   
                                        _sparsity, _min, _max, _pdfParams );
                        
-                       blk.randOperationsInPlace(rgen, new long[]{blockNNZ}, 
null, seed);
+                       blk.randOperationsInPlace(rgen, 
LongStream.of(blockNNZ), null, seed);
 
                        return new Tuple2<MatrixIndexes, MatrixBlock>(kv._1, 
blk);
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index 7e3de55..00ae9d3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -22,6 +22,8 @@ package org.apache.sysml.runtime.matrix;
 
 import java.io.PrintWriter;
 import java.util.HashSet;
+import java.util.PrimitiveIterator;
+import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -152,8 +154,8 @@ public class DataGenMR
                                
                                //seed generation
                                Well1024a bigrand = 
LibMatrixDatagen.setupSeedsForRand(randInst.getSeed());
-                               long[] nnz = 
LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], 
randInst.getSparsity());
-                               int nnzIx = 0;
+                               LongStream nnz = 
LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], 
randInst.getSparsity());
+                               PrimitiveIterator.OfLong nnzIter = 
nnz.iterator();
                                for(long r = 0; r < rlens[i]; r += brlens[i]) {
                                        long curBlockRowSize = 
Math.min(brlens[i], (rlens[i] - r));
                                        for(long c = 0; c < clens[i]; c += 
bclens[i])
@@ -168,7 +170,7 @@ public class DataGenMR
                                                sb.append(',');
                                                sb.append(curBlockColSize);
                                                sb.append(',');
-                                               sb.append(nnz[nnzIx++]);
+                                               sb.append(nnzIter.nextLong());
                                                sb.append(',');
                                                sb.append(bigrand.nextLong());
                                                pw.println(sb.toString());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
index fd7a17d..fb62c41 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
@@ -21,13 +21,13 @@
 package org.apache.sysml.runtime.matrix.data;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.LongStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +39,7 @@ import org.apache.sysml.runtime.util.NormalPRNGenerator;
 import org.apache.sysml.runtime.util.PRNGenerator;
 import org.apache.sysml.runtime.util.PoissonPRNGenerator;
 import org.apache.sysml.runtime.util.UniformPRNGenerator;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class LibMatrixDatagen 
 {
@@ -97,7 +98,7 @@ public class LibMatrixDatagen
                return bigrand;
        }
 
-       public static long[] computeNNZperBlock(long nrow, long ncol, int 
brlen, int bclen, double sparsity) 
+       public static LongStream computeNNZperBlock(long nrow, long ncol, int 
brlen, int bclen, double sparsity) 
                throws DMLRuntimeException 
        {
                long lnumBlocks = (long) (Math.ceil((double)nrow/brlen) * 
Math.ceil((double)ncol/bclen));
@@ -108,75 +109,32 @@ public class LibMatrixDatagen
                                        + "Number of blocks ("+lnumBlocks+") 
exceeds the maximum integer size. Try to increase the block size.");
                }
 
-               // NOTE: Total #of NNZ is set to the expected value 
(nrow*ncol*sparsity).
-               // TODO: Instead of using the expected value, NNZ should be 
random variable 
-               
                int numBlocks = (int) lnumBlocks;
+               int numColBlocks = (int) Math.ceil((double)ncol/bclen);
                long nnz = (long) Math.ceil (nrow * (ncol*sparsity));
                
-               // Compute block-level NNZ
-               long[] ret  = new long[numBlocks];
-               
-               if ( nnz < numBlocks ) {
-                       // Ultra-sparse matrix
-                       
-                       // generate the number of blocks with at least one 
non-zero
-                       // = a random number between [1,nnz]
+               if( nnz < numBlocks ) {
+                       //#1: ultra-sparse random number generation
+                       //nnz per block: 1 with probability P = nnz/numBlocks, 
0 with probability 1-P
+                       //(note: this is an unbiased generator that, however, 
will never generate more than 
+                       //one non-zero per block, but it uses weights to 
account for different block sizes)
+                       double P = (double) nnz / numBlocks;
                        Random runif = new Random(System.nanoTime());
-                       int numNZBlocks = 1;
-                       if(nnz-1 > 0)
-                               numNZBlocks += runif.nextInt((int)(nnz-1)); // 
To avoid exception from random.nextInt(0) 
-                       
-                       // distribute non-zeros across numNZBlocks
-
-                       // compute proportions for each nzblock 
-                       // - divide (0,1] interval into numNZBlocks portions of 
random size
-                       double[] blockNNZproportions = new double[numNZBlocks];
-                       
-                       runif.setSeed(System.nanoTime());
-                       for(int i=0; i < numNZBlocks-1; i++) {
-                               blockNNZproportions[i] = runif.nextDouble();
-                       }
-                       blockNNZproportions[numNZBlocks-1] = 1;
-                       // sort the values in ascending order
-                       Arrays.sort(blockNNZproportions);
-                       
-                       // compute actual number of non zeros per block 
according to proportions
-                       long actualnnz = 0;
-                       int bid;
-                       runif.setSeed(System.nanoTime());
-                       for(int i=0; i < numNZBlocks; i++) {
-                               bid = -1;
-                               do {
-                                       bid = runif.nextInt(numBlocks);
-                               } while( ret[bid] != 0);
-                               
-                               double prop = (i==0 ? blockNNZproportions[i]: 
(blockNNZproportions[i] - blockNNZproportions[i-1]));
-                               ret[bid] = (long)Math.floor(prop * nnz);
-                               actualnnz += ret[bid];
-                       }
-                       
-                       // Code to make sure exact number of non-zeros are 
generated
-                       while (actualnnz < nnz) {
-                               bid = runif.nextInt(numBlocks);
-                               ret[bid]++;
-                               actualnnz++;
-                       }
+                       return LongStream.range(0, numBlocks).map( i -> {
+                               double lP = P / (brlen*bclen) *
+                                       UtilFunctions.computeBlockSize(nrow, 1 
+ i / numColBlocks, brlen) *
+                                       UtilFunctions.computeBlockSize(ncol, 1 
+ i % numColBlocks, bclen);
+                               return (runif.nextDouble() <= lP) ? 1 : 0;
+                       });
                }
                else {
-                       int bid = 0;
-                       
-                       for(long r = 0; r < nrow; r += brlen) {
-                               long curBlockRowSize = Math.min(brlen, (nrow - 
r));
-                               for(long c = 0; c < ncol; c += bclen)
-                               {
-                                       long curBlockColSize = Math.min(bclen, 
(ncol - c));
-                                       ret[bid] = (long) (curBlockRowSize * 
curBlockColSize * sparsity);
-                                       bid++;
-                               }
-                       }
+                       //#2: dense/sparse random number generation
+                       //nnz per block: lrlen * lclen * sparsity (note: this 
is a biased generator 
+                       //that might actually create fewer but never more non 
zeros than expected)
+                       return LongStream.range(0, numBlocks).map( i -> 
(long)(sparsity * 
+                               UtilFunctions.computeBlockSize(nrow, 1 + i / 
numColBlocks, brlen) *
+                               UtilFunctions.computeBlockSize(ncol, 1 + i % 
numColBlocks, bclen)));
                }
-               return ret;
        }
 
     public static RandomMatrixGenerator createRandomMatrixGenerator(String 
pdf, int r, int c, int rpb, int cpb, double sp, double min, double max, String 
distParams) 
@@ -224,7 +182,7 @@ public class LibMatrixDatagen
      * @param bSeed seed for random generator
      * @throws DMLRuntimeException if DMLRuntimeException occurs
      */
-       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, long[] nnzInBlocks, 
+       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, LongStream nnzInBlocks, 
                                                        Well1024a bigrand, long 
bSeed ) 
                throws DMLRuntimeException
        {
@@ -247,28 +205,30 @@ public class LibMatrixDatagen
                double min = rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM) ? 
rgen._min : 0;
                double max = rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM) ? 
rgen._max : 1;
                
-               // Determine the sparsity of output matrix
-               // if invoked from CP: estimated NNZ is for entire matrix 
(nnz=0, if 0 initialized)
-               // if invoked from MR: estimated NNZ is for one block
-               final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 
: (long)(sparsity * rows * cols)) 
-                                                          : nnzInBlocks[0]);
-               boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, 
cols, estnnz );
-               out.reset(rows, cols, lsparse);
-               
                // Special case shortcuts for efficiency
                if ( rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM)) {
                        if ( min == 0.0 && max == 0.0 ) { //all zeros
-                               out.nonZeros = 0;
+                               out.reset(rows, cols, true);
                                return;
                        } 
-                       else if( !out.sparse && sparsity==1.0d && (min == max  
//equal values, dense
-                                       || (Double.isNaN(min) && 
Double.isNaN(max))) ) //min == max == NaN
-                       {
-                               out.reset(out.rlen, out.clen, min); 
+                       else if( sparsity==1.0d && (min == max  //equal values, 
dense
+                                       || (Double.isNaN(min) && 
Double.isNaN(max))) ) { //min == max == NaN
+                               out.reset(rows, cols, min); 
                                return;
                        }
                }
                
+               // collect nnz stream for multiple consumptions
+               long[] lnnzInBlocks = nnzInBlocks.toArray();
+               
+               // Determine the sparsity of output matrix
+               // if invoked from CP: estimated NNZ is for entire matrix 
(nnz=0, if 0 initialized)
+               // if invoked from MR: estimated NNZ is for one block
+               final long estnnz = (invokedFromCP ? ((min==0.0 && max==0.0)? 0 
: 
+                               (long)(sparsity * rows * cols)) : 
lnnzInBlocks[0]);
+               boolean lsparse = MatrixBlock.evalSparseFormatInMemory( rows, 
cols, estnnz );
+               out.reset(rows, cols, lsparse);
+               
                // Allocate memory
                //note: individual sparse rows are allocated on demand,
                //for consistency with memory estimates and prevent OOMs.
@@ -281,7 +241,7 @@ public class LibMatrixDatagen
                int ncb = (int) Math.ceil((double)cols/cpb);
                long[] seeds = invokedFromCP ? generateSeedsForCP(bigrand, nrb, 
ncb) : null;
                
-               genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, 
nnzInBlocks, bSeed, seeds);
+               genRandomNumbers(invokedFromCP, 0, nrb, 0, ncb, out, rgen, 
lnnzInBlocks, bSeed, seeds);
                
                out.recomputeNonZeros();
        }
@@ -309,7 +269,7 @@ public class LibMatrixDatagen
      * @param k ?
      * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, long[] nnzInBlocks, 
+       public static void generateRandomMatrix( MatrixBlock out, 
RandomMatrixGenerator rgen, LongStream nnzInBlocks, 
                        Well1024a bigrand, long bSeed, int k ) 
                throws DMLRuntimeException      
        {       
@@ -343,16 +303,14 @@ public class LibMatrixDatagen
                        return;
                }
 
-               out.reset(rows, cols, lsparse);
-               
                //special case shortcuts for efficiency
                if ( rgen._pdf.equalsIgnoreCase(RAND_PDF_UNIFORM)) {
                        if ( min == 0.0 && max == 0.0 ) { //all zeros
-                               out.nonZeros = 0;
+                               out.reset(rows, cols, false);
                                return;
                        } 
-                       else if( !out.sparse && sparsity==1.0d && min == max ) 
{ //equal values
-                               out.reset(out.rlen, out.clen, min); 
+                       else if( sparsity==1.0d && min == max ) { //equal values
+                               out.reset(rows, cols, min); 
                                return;
                        }
                }
@@ -360,6 +318,7 @@ public class LibMatrixDatagen
                //allocate memory
                //note: individual sparse rows are allocated on demand,
                //for consistency with memory estimates and prevent OOMs.
+               out.reset(rows, cols, lsparse);
                if( out.sparse )
                        out.allocateSparseRowsBlock();
                else
@@ -376,6 +335,9 @@ public class LibMatrixDatagen
                //generate seeds independent of parallelizations
                long[] seeds = generateSeedsForCP(bigrand, nrb, ncb);
                
+               // collect nnz stream for multiple consumptions
+               long[] lnnzInBlocks = nnzInBlocks.toArray();
+               
                try 
                {
                        ExecutorService pool = Executors.newFixedThreadPool(k);
@@ -388,7 +350,7 @@ public class LibMatrixDatagen
                                int cu = parcol ? Math.min((i+1)*blklen, parnb) 
: ncb;
                                long[] lseeds = sliceSeedsForCP(seeds, rl, ru, 
cl, cu, nrb, ncb);
                                tasks.add(new RandTask(rl, ru, cl, cu, out, 
-                                               rgen, nnzInBlocks, bSeed, 
lseeds) );    
+                                               rgen, lnnzInBlocks, bSeed, 
lseeds) );   
                        }
                        List<Future<Object>> ret = pool.invokeAll(tasks);
                        pool.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 5fc69ce..005f24e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -30,6 +30,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.stream.LongStream;
 
 import org.apache.commons.math3.random.Well1024a;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -5575,7 +5576,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        {
                MatrixBlock out = new MatrixBlock();
                Well1024a bigrand = null;
-               long[] nnzInBlock = null;
+               LongStream nnzInBlock = null;
 
                //setup seeds and nnz per block
                if( !LibMatrixDatagen.isShortcutRandOperation(rgen._min, 
rgen._max, rgen._sparsity, rgen._pdf) ){
@@ -5614,7 +5615,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        public MatrixBlock randOperationsInPlace(
-                                                               
RandomMatrixGenerator rgen, long[] nnzInBlock, 
+                                                               
RandomMatrixGenerator rgen, LongStream nnzInBlock, 
                                                                Well1024a 
bigrand, long bSeed ) 
                throws DMLRuntimeException
        {
@@ -5645,7 +5646,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
        public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, 
-                       long[] nnzInBlock, Well1024a bigrand, long bSeed, int 
k) 
+                       LongStream nnzInBlock, Well1024a bigrand, long bSeed, 
int k) 
                throws DMLRuntimeException
        {
                LibMatrixDatagen.generateRandomMatrix( this, rgen, nnzInBlock, 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/977240dd/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
index 49b2804..30616a2 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DataGenMapper.java
@@ -21,6 +21,7 @@
 package org.apache.sysml.runtime.matrix.mapred;
 
 import java.io.IOException;
+import java.util.stream.LongStream;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -83,7 +84,7 @@ implements Mapper<Writable, Writable, Writable, Writable>
                                                                                
                                                                pdf, 
blockRowSize, blockColSize, blockRowSize, blockColSize,   
                                                                                
                                                                sparsity, 
minValue, maxValue, randInst.getPdfParams() );
 
-                                       block[i].randOperationsInPlace(rgen, 
new long[]{blockNNZ}, null, seed); 
+                                       block[i].randOperationsInPlace(rgen, 
LongStream.of(blockNNZ), null, seed); 
                                } 
                                catch(DMLRuntimeException e) {
                                        throw new IOException(e);

Reply via email to