Repository: systemml
Updated Branches:
  refs/heads/master 667aeb2b7 -> fcfbd3d24


[SYSTEMML-1838] Performance sparse/ultra-sparse binary block read

This patch makes a number of performance improvements for CP read
(single- and multi-threaded) of sparse and ultra-sparse matrices in
binary block format:

(1) Allocate reuse block in CSR format: This avoids unnecessary
reallocations in the presence of a mix of sparse and ultra-sparse blocks
because ultra-sparse blocks clear any reuse block that is not in CSR.

(2) Special CSR init for sparse blocks from input streams (in addition
to the existing init for ultra-sparse blocks)

(3) Exploit estimated number of non-zeros on sparse block append (also
used during read), which changes the allocation factors from 1.1x to 2x
until the estimated number of non-zeros per row is reached and hence,
avoids unnecessary reallocations.

(4) Improved load balance via more tasks on the final sort of sparse
rows.

Together these changes improved the CP read time of a 1M x 1M matrix
with sparsity 0.001 (with a mix of sparse and ultra-sparse blocks) of
roughly 12GB in size and 1M blocks from 41s to 32.4s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9a275acb
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9a275acb
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9a275acb

Branch: refs/heads/master
Commit: 9a275acb0dd3f991fad7950d1824a89e109a9b43
Parents: 667aeb2
Author: Matthias Boehm <[email protected]>
Authored: Mon Aug 14 19:11:52 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Aug 15 12:48:25 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheDataInput.java  | 10 +++--
 .../apache/sysml/runtime/io/MatrixReader.java   |  7 ++--
 .../sysml/runtime/io/ReaderBinaryBlock.java     | 32 ++++++++++-----
 .../runtime/io/ReaderBinaryBlockParallel.java   | 15 +++----
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 26 ++++++-------
 .../matrix/data/MatrixBlockDataInput.java       |  5 ++-
 .../runtime/matrix/data/SparseBlockCSR.java     | 41 ++++++++++++++++----
 .../runtime/matrix/data/SparseRowVector.java    |  7 ++--
 .../util/FastBufferedDataInputStream.java       | 29 ++++++++------
 9 files changed, 110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
index 24df6d4..8f16ae1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
@@ -163,11 +163,11 @@ public class CacheDataInput implements DataInput, 
MatrixBlockDataInput
        }
 
        @Override
-       public long readSparseRows(int rlen, SparseBlock rows) 
+       public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
                throws IOException 
        {
                //counter for non-zero elements
-               long nnz = 0;
+               long gnnz = 0;
                
                //read all individual sparse rows from input
                for( int i=0; i<rlen; i++ )
@@ -189,10 +189,14 @@ public class CacheDataInput implements DataInput, 
MatrixBlockDataInput
                                        _count+=12;
                                }
                                
-                               nnz += lnnz;    
+                               gnnz += lnnz;   
                        }
                }
                
+               //sanity check valid number of read nnz
+               if( gnnz != nnz )
+                       throw new IOException("Invalid number of read nnz: 
"+gnnz+" vs "+nnz);
+               
                return nnz;
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index befccfe..9c59d4e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -118,10 +118,11 @@ public abstract class MatrixReader
        protected static void sortSparseRowsParallel(MatrixBlock dest, long 
rlen, int k, ExecutorService pool) 
                throws InterruptedException, ExecutionException
        {
-               //create sort tasks
+               //create sort tasks (increase number of tasks for better load 
balance)
                ArrayList<SortRowsTask> tasks = new ArrayList<SortRowsTask>();
-               int blklen = (int)(Math.ceil((double)rlen/k));
-               for( int i=0; i<k & i*blklen<rlen; i++ )
+               int k2 = (int) Math.min(8*k, rlen); 
+               int blklen = (int)(Math.ceil((double)rlen/k2));
+               for( int i=0; i<k2 & i*blklen<rlen; i++ )
                        tasks.add(new SortRowsTask(dest, i*blklen, 
Math.min((i+1)*blklen, (int)rlen)));
                
                //execute parallel sort and check for errors

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 0bca17d..e0c217a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -33,6 +33,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
@@ -89,19 +90,30 @@ public class ReaderBinaryBlock extends MatrixReader
                ArrayList<IndexedMatrixValue> ret = new 
ArrayList<IndexedMatrixValue>();
                
                //prepare file access
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               Path path = new Path( (_localFS ? "file:///" : "") + fname); 
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               Path path = new Path( (_localFS ? "file:///" : "") + fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
-               checkValidInputFile(fs, path); 
+               checkValidInputFile(fs, path);
        
                //core read 
                readBinaryBlockMatrixBlocksFromHDFS(path, job, fs, ret, rlen, 
clen, brlen, bclen);
                
                return ret;
        }
-
+       
+       protected static MatrixBlock getReuseBlock(int brlen, int bclen, 
boolean sparse) {
+               //note: we allocate the reuse block in CSR because this avoids 
unnecessary
+               //reallocations in the presence of a mix of sparse and 
ultra-sparse blocks,
+               //where ultra-sparse deserialization only reuses CSR blocks
+               MatrixBlock value = new MatrixBlock(brlen, bclen, sparse);
+               if( sparse ) {
+                       value.allocateAndResetSparseRowsBlock(true, 
SparseBlock.Type.CSR);
+                       value.getSparseBlock().allocate(0, 1024);
+               }
+               return value;
+       }
 
        
        /**
@@ -125,13 +137,12 @@ public class ReaderBinaryBlock extends MatrixReader
         * @throws IOException if IOException occurs
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       @SuppressWarnings("deprecation")
        private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf 
job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int 
bclen )
                throws IOException, DMLRuntimeException
        {
                boolean sparse = dest.isInSparseFormat();
                MatrixIndexes key = new MatrixIndexes(); 
-               MatrixBlock value = new MatrixBlock();
+               MatrixBlock value = getReuseBlock(brlen, bclen, sparse);
                long lnnz = 0; //aggregate block nnz
                
                //set up preferred custom serialization framework for binary 
block format
@@ -141,7 +152,8 @@ public class ReaderBinaryBlock extends MatrixReader
                for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
                {
                        //directly read from sequence files (individual 
partfiles)
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
+                       SequenceFile.Reader reader = new SequenceFile
+                               .Reader(job, SequenceFile.Reader.file(lpath));
                        
                        try
                        {
@@ -195,8 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader
                        dest.sortSparseRows();
                }
        }
-
-       @SuppressWarnings("deprecation")
+       
        private void readBinaryBlockMatrixBlocksFromHDFS( Path path, JobConf 
job, FileSystem fs, Collection<IndexedMatrixValue> dest, long rlen, long clen, 
int brlen, int bclen )
                throws IOException
        {
@@ -210,7 +221,8 @@ public class ReaderBinaryBlock extends MatrixReader
                for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
                {
                        //directly read from sequence files (individual 
partfiles)
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
+                       SequenceFile.Reader reader = new SequenceFile
+                               .Reader(job, SequenceFile.Reader.file(lpath));
                        
                        try
                        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index e7114d8..16260a8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -90,7 +90,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
                        ArrayList<ReadFileTask> tasks = new 
ArrayList<ReadFileTask>();
                        for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) ){
-                               ReadFileTask t = new ReadFileTask(lpath, job, 
fs, dest, rlen, clen, brlen, bclen);
+                               ReadFileTask t = new ReadFileTask(lpath, job, 
dest, rlen, clen, brlen, bclen);
                                tasks.add(t);
                        }
 
@@ -118,17 +118,15 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
        {
                private Path _path = null;
                private JobConf _job = null;
-               private FileSystem _fs = null;
                private MatrixBlock _dest = null;
                private long _rlen = -1;
                private long _clen = -1;
                private int _brlen = -1;
                private int _bclen = -1;
                
-               public ReadFileTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock dest, long rlen, long clen, int brlen, int bclen)
+               public ReadFileTask(Path path, JobConf job, MatrixBlock dest, 
long rlen, long clen, int brlen, int bclen)
                {
                        _path = path;
-                       _fs = fs;
                        _job = job;
                        _dest = dest;
                        _rlen = rlen;
@@ -138,16 +136,16 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                }
 
                @Override
-               @SuppressWarnings({ "deprecation" })
                public Object call() throws Exception 
                {
                        boolean sparse = _dest.isInSparseFormat();
                        MatrixIndexes key = new MatrixIndexes(); 
-                       MatrixBlock value = new MatrixBlock();
+                       MatrixBlock value = getReuseBlock(_brlen, _bclen, 
sparse);
                        long lnnz = 0; //aggregate block nnz
                        
                        //directly read from sequence files (individual 
partfiles)
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(_fs,_path,_job);
+                       SequenceFile.Reader reader = new SequenceFile
+                               .Reader(_job, SequenceFile.Reader.file(_path));
                        
                        try
                        {
@@ -205,8 +203,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                                        lnnz += value.getNonZeros();
                                }
                        }
-                       finally
-                       {
+                       finally {
                                IOUtilFunctions.closeSilently(reader);
                        }
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index e9039ba..145eb97 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -736,7 +736,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                        int len = b.size(i);
                                        int[] ix = b.indexes(i);
                                        double[] val = b.values(i);
-                                       sparseBlock.allocate(aix, 
sparseBlock.size(aix)+len);
+                                       if( estimatedNNzsPerRow > 0 )
+                                               sparseBlock.allocate(aix, 
Math.max(estimatedNNzsPerRow, sparseBlock.size(aix)+len), clen);
+                                       else
+                                               sparseBlock.allocate(aix, 
sparseBlock.size(aix)+len);
                                        for( int j=pos; j<pos+len; j++ )
                                                sparseBlock.append(aix, 
coloffset+ix[j], val[j]);       
                                }
@@ -1894,39 +1897,36 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        }
                }
        }
-
+       
        private void readSparseBlock(DataInput in) 
                throws IOException 
        {                       
-               allocateSparseRowsBlock(false); 
-               resetSparse(); //reset all sparse rows
+               allocateSparseRowsBlock(false);
+               resetSparse();
                
-               if( in instanceof MatrixBlockDataInput ) //fast deserialize
-               {
+               if( in instanceof MatrixBlockDataInput ) { //fast deserialize
                        MatrixBlockDataInput mbin = (MatrixBlockDataInput)in;
-                       nonZeros = mbin.readSparseRows(rlen, sparseBlock);
+                       nonZeros = mbin.readSparseRows(rlen, nonZeros, 
sparseBlock);
                }
-               else if( in instanceof DataInputBuffer  && 
MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) 
-               {
+               else if( in instanceof DataInputBuffer && 
MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) {
                        //workaround because sequencefile.reader.next(key, 
value) does not yet support serialization framework
                        DataInputBuffer din = (DataInputBuffer)in;
                        FastBufferedDataInputStream mbin = null;
                        try {
                                mbin = new FastBufferedDataInputStream(din);
-                               nonZeros = mbin.readSparseRows(rlen, 
sparseBlock);      
+                               nonZeros = mbin.readSparseRows(rlen, nonZeros, 
sparseBlock);
                        }
                        finally {
                                IOUtilFunctions.closeSilently(mbin);
                        }
                }
-               else //default deserialize
-               {
+               else { //default deserialize
                        for(int r=0; r<rlen; r++) {
                                int rnnz = in.readInt(); //row nnz
                                if( rnnz > 0 ) {
                                        sparseBlock.reset(r, rnnz, clen);
                                        for(int j=0; j<rnnz; j++) //col 
index/value pairs
-                                               sparseBlock.append(r, 
in.readInt(), in.readDouble());           
+                                               sparseBlock.append(r, 
in.readInt(), in.readDouble());
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
index c673623..6fc36fb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
@@ -50,11 +50,12 @@ public interface MatrixBlockDataInput
         * Reads the sparse rows array from the data input into a sparse block
         * and returns the number of non-zeros.
         * 
-        * @param rlen ?
+        * @param rlen number of rows
+        * @param nnz number of non-zeros
         * @param rows sparse block
         * @return number of non-zeros
         * @throws IOException if IOExcepton occurs
         */
-       public long readSparseRows(int rlen, SparseBlock rows) 
+       public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
                throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index a4e680d..bc817c7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -176,13 +176,9 @@ public class SparseBlockCSR extends SparseBlock
        public void initUltraSparse(int nnz, DataInput in) 
                throws IOException 
        {
-               //ensure empty block before init
-               if( _size > 0 )
-                       reset();
-                       
                //allocate space if necessary
                if( _values.length < nnz )
-                       resize(nnz);
+                       resize(newCapacity(nnz));
                
                //read ijv triples, append and update pointers
                int rlast = 0;
@@ -201,6 +197,37 @@ public class SparseBlockCSR extends SparseBlock
        }
        
        /**
+        * Initializes the CSR sparse block from an ordered input
+        * stream of sparse rows (rownnz, jv-pairs*). 
+        * 
+        * @param rlen number of rows
+        * @param nnz number of non-zeros to read
+        * @param in data input stream of sparse rows, ordered by i
+        * @throws IOException if deserialization error occurs
+        */
+       public void initSparse(int rlen, int nnz, DataInput in) 
+               throws IOException
+       {
+               //allocate space if necessary
+               if( _values.length < nnz )
+                       resize(newCapacity(nnz));
+               
+               //read sparse rows, append and update pointers
+               _ptr[0] = 0;
+               for( int r=0, pos=0; r<rlen; r++ ) {
+                       int lnnz = in.readInt();
+                       for( int j=0; j<lnnz; j++, pos++ ) {
+                               _indexes[pos] = in.readInt();
+                               _values[pos] = in.readDouble();
+                       }
+                       _ptr[r+1] = pos;
+               }
+               
+               //update meta data
+               _size = nnz;
+       }
+       
+       /**
         * Get the estimated in-memory size of the sparse block in CSR 
         * with the given dimensions w/o accounting for overallocation. 
         * 
@@ -402,14 +429,14 @@ public class SparseBlockCSR extends SparseBlock
                        if( _size==_values.length )
                                resize();
                        insert(_size, c, v);            
-               }               
+               }
                else {
                        //resize, shift and insert
                        if( _size==_values.length )
                                resizeAndInsert(pos+len, c, v);
                        else
                                shiftRightAndInsert(pos+len, c, v);
-               }                       
+               }
                incrPtr(r+1);
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index 1c160b2..4927906 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -147,10 +147,9 @@ public final class SparseRowVector extends SparseRow 
implements Serializable
         * @return new capacity for resizing
         */
        private int newCapacity() {
-               if( values.length < estimatedNzs )
-                       return Math.min(estimatedNzs, values.length*2);
-               else
-                       return (int) Math.min(maxNzs, 
Math.ceil((double)(values.length)*1.1));
+               return (int) ((values.length < estimatedNzs) ?
+                       Math.min(estimatedNzs, 
values.length*SparseBlock.RESIZE_FACTOR1) :
+                       Math.min(maxNzs, 
Math.ceil(values.length*SparseBlock.RESIZE_FACTOR2)));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java 
b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
index 932ceb1..197467d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
+++ 
b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 
 import org.apache.sysml.runtime.matrix.data.MatrixBlockDataInput;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockCSR;
 
 public class FastBufferedDataInputStream extends FilterInputStream implements 
DataInput, MatrixBlockDataInput
 {
@@ -34,23 +35,19 @@ public class FastBufferedDataInputStream extends 
FilterInputStream implements Da
        protected byte[] _buff;
        protected int _bufflen;
        
-       public FastBufferedDataInputStream( InputStream in )
-       {
+       public FastBufferedDataInputStream( InputStream in ) {
                this(in, 8192);
        }
        
-       public FastBufferedDataInputStream( InputStream in, int size )
-       {
+       public FastBufferedDataInputStream( InputStream in, int size ) {
                super(in);
                
                if (size <= 0) 
                throw new IllegalArgumentException("Buffer size <= 0");
-               
                _buff = new byte[ size ];
                _bufflen = size;
        }
 
-
        /////////////////////////////
        // DataInput Implementation
        /////////////////////////////
@@ -169,8 +166,8 @@ public class FastBufferedDataInputStream extends 
FilterInputStream implements Da
        public long readDoubleArray(int len, double[] varr) 
                throws IOException 
        {
-               //if( len<=0 || len != varr.length )
-               //      throw new IndexOutOfBoundsException("len="+len+", 
varr.length="+varr.length);
+               if( len<=0 || len > varr.length )
+                       throw new IndexOutOfBoundsException("len="+len+", 
varr.length="+varr.length);
                
                //counter for non-zero elements
                long nnz = 0;
@@ -198,11 +195,17 @@ public class FastBufferedDataInputStream extends 
FilterInputStream implements Da
        }
 
        @Override
-       public long readSparseRows(int rlen, SparseBlock rows) 
+       public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
                throws IOException 
        {
+               //check for CSR quick-path
+               if( rows instanceof SparseBlockCSR ) {
+                       ((SparseBlockCSR) rows).initSparse(rlen, (int)nnz, 
this);
+                       return nnz;
+               }
+               
                //counter for non-zero elements
-               long nnz = 0;
+               long gnnz = 0;
                
                //read all individual sparse rows from input
                for( int i=0; i<rlen; i++ )
@@ -241,10 +244,14 @@ public class FastBufferedDataInputStream extends 
FilterInputStream implements Da
                                        }
                                }
                                
-                               nnz += lnnz;    
+                               gnnz += lnnz;   
                        }
                }
                
+               //sanity check valid number of read nnz
+               if( gnnz != nnz )
+                       throw new IOException("Invalid number of read nnz: 
"+gnnz+" vs "+nnz);
+               
                return nnz;
        }
 

Reply via email to