Repository: incubator-systemml
Updated Branches:
  refs/heads/master 3f0c12ece -> b2feaf759


[SYSTEMML-1289] Performance codegen cellwise over compressed matrices

This patch makes the following performance improvements to codegen
cellwise operations (and others) over compressed matrices:

1) Sparse output matrices for sparse-safe operations, where we ensure
correctness by explicitly sorting the output sparse rows, which is done
in a thread-local manner per row partition.

2) Pre-allocation of sparse rows in order to avoid repeated
re-allocations by determining the number of non zeros per row partition
up front. Note that this requires segment-aligned row partitions, which
we only enforce if this does not limit the effective degree of
parallelism.

3) Skip-scan exploitation to find rl boundaries for OLE value iterators
in order to avoid repeated scans of the entire iterator just to find the
row partition starts. This is also implicitly used by all other codegen
operations over compressed matrices.

For example, on a scenario with a pre-processed ImageNet dataset of 1.2M
images, 30x30 pixels per image, and a simple sparse-safe cellwise
generated operator for X*(Y+7), where X is the compressed input, this
patch improved single-node performance from 36s to 11.5s. This is
competitive, given that the uncompressed codegen operation takes 16.3s.  

Finally, this patch also includes a robustness fix of the generic
ColGroupOffset iterator for cases, where an iterator over a row
partition does not have any row indexes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2feaf75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2feaf75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2feaf75

Branch: refs/heads/master
Commit: b2feaf759b701889a0a2649608df8833eaf4737a
Parents: 3f0c12e
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Tue May 30 17:21:14 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Tue May 30 18:06:06 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofCellwise.java    | 124 +++++++++++++------
 .../sysml/runtime/compress/BitmapEncoder.java   |   5 +
 .../sysml/runtime/compress/ColGroupOLE.java     |  24 ++--
 .../sysml/runtime/compress/ColGroupOffset.java  |   3 +-
 .../runtime/compress/CompressedMatrixBlock.java |  22 ++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   3 +-
 6 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
index 3e1dcad..cc8ef69 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
@@ -196,31 +197,32 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                }
                
                //input preparation
+               MatrixBlock a = inputs.get(0);
                SideInput[] b = prepInputMatricesAbstract(inputs);
                double[] scalars = prepInputScalars(scalarObjects);
-               final int m = inputs.get(0).getNumRows();
-               final int n = inputs.get(0).getNumColumns();
+               final int m = a.getNumRows();
+               final int n = a.getNumColumns();
                
                //sparse safe check 
                boolean sparseSafe = isSparseSafe() || (b.length == 0
                                && genexec( 0, b, scalars, m, n, 0, 0 ) == 0);
                
                //result allocation and preparations
-               boolean sparseOut = sparseSafe && 
inputs.get(0).isInSparseFormat()
-                               && _type == CellType.NO_AGG && !(inputs.get(0) 
instanceof CompressedMatrixBlock);
-               out.reset(inputs.get(0).getNumRows(), _type == CellType.NO_AGG ?
-                               inputs.get(0).getNumColumns() : 1, sparseOut);
+               boolean sparseOut = sparseSafe && a.isInSparseFormat()
+                               && _type == CellType.NO_AGG;
+               out.reset(a.getNumRows(), _type == CellType.NO_AGG ?
+                               a.getNumColumns() : 1, sparseOut);
                out.allocateDenseOrSparseBlock();
                
                long lnnz = 0;
                if( k <= 1 ) //SINGLE-THREADED
                {
                        if( inputs.get(0) instanceof CompressedMatrixBlock )
-                               lnnz = 
executeCompressed((CompressedMatrixBlock)inputs.get(0), b, scalars, out, m, n, 
sparseSafe, 0, m);
+                               lnnz = 
executeCompressed((CompressedMatrixBlock)a, b, scalars, out, m, n, sparseSafe, 
0, m);
                        else if( !inputs.get(0).isInSparseFormat() )
-                               lnnz = 
executeDense(inputs.get(0).getDenseBlock(), b, scalars, out, m, n, sparseSafe, 
0, m);
+                               lnnz = executeDense(a.getDenseBlock(), b, 
scalars, out, m, n, sparseSafe, 0, m);
                        else
-                               lnnz = 
executeSparse(inputs.get(0).getSparseBlock(), b, scalars, out, m, n, 
sparseSafe, 0, m);
+                               lnnz = executeSparse(a.getSparseBlock(), b, 
scalars, out, m, n, sparseSafe, 0, m);
                }
                else  //MULTI-THREADED
                {
@@ -229,9 +231,12 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                ArrayList<ParExecTask> tasks = new 
ArrayList<ParExecTask>();
                                int nk = 
UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
                                int blklen = (int)(Math.ceil((double)m/nk));
+                               if( a instanceof CompressedMatrixBlock && 
sparseOut
+                                       && k/2*BitmapEncoder.BITMAP_BLOCK_SZ < 
m)
+                                       blklen = 
BitmapEncoder.getAlignedBlocksize(blklen);
                                for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new 
ParExecTask(inputs.get(0), b, scalars, out, 
-                                               m, n, sparseSafe, i*blklen, 
Math.min((i+1)*blklen, m))); 
+                                       tasks.add(new ParExecTask(a, b, 
scalars, out, m, n,
+                                               sparseSafe, i*blklen, 
Math.min((i+1)*blklen, m))); 
                                //execute tasks
                                List<Future<Long>> taskret = 
pool.invokeAll(tasks);     
                                pool.shutdown();
@@ -253,7 +258,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
        /////////
        //function dispatch
        
-       private long executeDense(double[] a, SideInput[] b, double[] scalars, 
MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDense(double[] a, SideInput[] b, double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                double[] c = out.getDenseBlock();
@@ -270,7 +276,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeDenseAndAgg(double[] a, SideInput[] b, double[] 
scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws 
DMLRuntimeException 
+       private double executeDenseAndAgg(double[] a, SideInput[] b, double[] 
scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
throws DMLRuntimeException 
        {
                //numerically stable aggregation for sum/sum_sq
                if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
@@ -279,7 +286,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                        return executeDenseAggMxx(a, b, scalars, m, n, 
sparseSafe, rl, ru);
        }
        
-       private long executeSparse(SparseBlock sblock, SideInput[] b, double[] 
scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeSparse(SparseBlock sblock, SideInput[] b, double[] 
scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                if( sparseSafe && sblock == null )
@@ -301,7 +309,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, 
double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                if( sparseSafe && sblock == null )
@@ -313,15 +322,18 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                        return executeSparseAggMxx(sblock, b, scalars, m, n, 
sparseSafe, rl, ru);
        }
        
-       private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, 
double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, 
double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
-               double[] c = out.getDenseBlock();
-               
                if( _type == CellType.NO_AGG ) {
-                       return executeCompressedNoAgg(a, b, scalars, c, m, n, 
sparseSafe, rl, ru);
+                       long lnnz = executeCompressedNoAgg(a, b, scalars, out, 
m, n, sparseSafe, rl, ru);
+                       if( out.isInSparseFormat() )
+                               out.sortSparseRows(rl, ru);
+                       return lnnz;
                }
                else if( _type == CellType.ROW_AGG ) {
+                       double[] c = out.getDenseBlock();
                        if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
                                return executeCompressedRowAggSum(a, b, 
scalars, c, m, n, sparseSafe, rl, ru);
                        else
@@ -330,7 +342,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeCompressedAndAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int 
ru) throws DMLRuntimeException 
+       private double executeCompressedAndAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
throws DMLRuntimeException 
        {
                //numerically stable aggregation for sum/sum_sq
                if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
@@ -342,7 +355,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
        /////////
        //core operator skeletons for dense, sparse, and compressed
 
-       private long executeDenseNoAgg(double[] a, SideInput[] b, double[] 
scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDenseNoAgg(double[] a, SideInput[] b, double[] 
scalars, 
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
                throws DMLRuntimeException 
        {
                long lnnz = 0;
@@ -357,7 +371,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] 
scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] 
scalars, 
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -375,7 +390,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] 
scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] 
scalars, 
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
                throws DMLRuntimeException 
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
@@ -403,7 +419,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private double executeDenseAggSum(double[] a, SideInput[] b, double[] 
scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeDenseAggSum(double[] a, SideInput[] b, double[] 
scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -418,7 +435,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return kbuff._sum;
        }
        
-       private double executeDenseAggMxx(double[] a, SideInput[] b, double[] 
scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeDenseAggMxx(double[] a, SideInput[] b, double[] 
scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                //safe aggregation for min/max w/ handling of zero entries
@@ -435,7 +453,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] 
b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] 
b, double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
@@ -471,7 +490,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, 
double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
@@ -504,7 +524,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -542,7 +563,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
@@ -580,7 +602,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -614,7 +637,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return kbuff._sum;
        }
        
-       private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
@@ -649,21 +673,41 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       private long executeCompressedNoAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, 
int rl, int ru) 
+       private long executeCompressedNoAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
                throws DMLRuntimeException 
        {
+               double[] c = out.getDenseBlock();
+               SparseBlock csblock = out.getSparseBlock();
+               
+               //preallocate sparse rows to avoid reallocations
+               //note: counting nnz requires segment-aligned boundaries, which 
is enforced 
+               //whenever k/2 * BITMAP_BLOCK_SZ > m (i.e., it does not limit 
parallelism)
+               if( out.isInSparseFormat() && 
rl%BitmapEncoder.BITMAP_BLOCK_SZ==0
+                       && ru%BitmapEncoder.BITMAP_BLOCK_SZ==0) {
+                       int[] rnnz = a.countNonZerosPerRow(rl, ru);
+                       for( int i=rl; i<ru; i++ )
+                               csblock.allocate(i, rnnz[i-rl]);
+               }
+               
                long lnnz = 0;
                Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
                while( iter.hasNext() ) {
                        IJV cell = iter.next();
                        double val = genexec(cell.getV(), b, scalars, m, n, 
cell.getI(), cell.getJ());
-                       c[cell.getI()*n+cell.getJ()] = val; 
+                       if( out.isInSparseFormat() ) {
+                               csblock.allocate(cell.getI());
+                               csblock.append(cell.getI(), cell.getJ(), val);
+                       }
+                       else
+                               c[cell.getI()*n+cell.getJ()] = val; 
                        lnnz += (val!=0) ? 1 : 0;
                }
                return lnnz;
        }
        
-       private long executeCompressedRowAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, 
int rl, int ru) 
+       private long executeCompressedRowAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -682,7 +726,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeCompressedRowAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, 
int rl, int ru) 
+       private long executeCompressedRowAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
                throws DMLRuntimeException 
        {
                Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE);
@@ -699,7 +744,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private double executeCompressedAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int 
ru) 
+       private double executeCompressedAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -714,7 +760,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return kbuff._sum;
        }
        
-       private double executeCompressedAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int 
ru) 
+       private double executeCompressedAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+                       int m, int n, boolean sparseSafe, int rl, int ru) 
                throws DMLRuntimeException 
        {
                //safe aggregation for min/max w/ handling of zero entries
@@ -731,7 +778,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       protected abstract double genexec( double a, SideInput[] b, double[] 
scalars, int m, int n, int rowIndex, int colIndex);
+       protected abstract double genexec( double a, SideInput[] b, 
+                       double[] scalars, int m, int n, int rowIndex, int 
colIndex);
        
        private class ParAggTask implements Callable<Double> 
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java 
b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
index a1f7454..f93d8b3 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
@@ -38,6 +38,11 @@ public class BitmapEncoder
        /** Size of the blocks used in a blocked bitmap representation. */
        public static final int BITMAP_BLOCK_SZ = 65536;
        
+       public static int getAlignedBlocksize(int blklen) {
+               return blklen + ((blklen%BITMAP_BLOCK_SZ != 0) ? 
+                       BITMAP_BLOCK_SZ-blklen%BITMAP_BLOCK_SZ : 0);
+       }
+       
        /**
         * Generate uncompressed bitmaps for a set of columns in an uncompressed
         * matrix block.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index ac0b803..71be538 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -782,13 +782,23 @@ public class ColGroupOLE extends ColGroupOffset
                        _ru = ru;
                        _boff = _ptr[k];
                        _blen = len(k);
-                       _bix = 0;
-                       _start = 0; //init first segment
-                       _slen = _data[_boff + _bix];
-                       _spos = 0;
-                       _rpos = _data[_boff + _bix + 1];
-                       while( _rpos < rl )
-                               nextRowOffset();
+                       
+                       //initialize position via segment-aligned skip-scan
+                       int lrl = rl - rl%BitmapEncoder.BITMAP_BLOCK_SZ;
+                       _bix = skipScanVal(k, lrl);
+                       _start = lrl; 
+                       
+                       //move position to actual rl boundary
+                       if( _bix < _blen ) {
+                               _slen = _data[_boff + _bix];
+                               _spos = 0;
+                               _rpos = _data[_boff + _bix + 1];
+                               while( _rpos < rl )
+                                       nextRowOffset();
+                       }
+                       else {
+                               _rpos = _ru;
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
index 5fc4a5a..bc0b7f1 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
@@ -476,7 +476,8 @@ public abstract class ColGroupOffset extends ColGroupValue
                                _rpos = _ru; //end after zero iterator
                                return;
                        }
-                       else if( _cpos+1 >= getNumCols() && !(_viter!=null && 
_viter.hasNext()) ) {
+                       else if( (_rpos< 0 || _cpos+1 >= getNumCols()) 
+                                       && !(_viter!=null && _viter.hasNext()) 
) {
                                do {
                                        _vpos++;
                                        if( _vpos < getNumValues() )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 0fbe608..ca22b63 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -594,9 +594,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                try {
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        int rlen = getNumRows();
-                       int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
-                       int blklen = (int)(Math.ceil((double)rlen/k));
-                       blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+                       int blklen = BitmapEncoder.getAlignedBlocksize(
+                               (int)(Math.ceil((double)rlen/k)));
                        ArrayList<DecompressTask> tasks = new 
ArrayList<DecompressTask>();
                        for( int i=0; i<k & i*blklen<getNumRows(); i++ )
                                tasks.add(new DecompressTask(_colGroups, ret, 
i*blklen, Math.min((i+1)*blklen,rlen)));
@@ -813,6 +812,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                return new ColumnGroupIterator(rl, ru, cgl, cgu, inclZeros);
        }
        
+       public int[] countNonZerosPerRow(int rl, int ru) {
+               int[] rnnz = new int[ru-rl];
+               for (ColGroup grp : _colGroups)
+                       grp.countNonZerosPerRow(rnnz, rl, ru);
+               return rnnz;
+       }
+       
        //////////////////////////////////////////
        // Operations (overwrite existing ops for seamless integration)
 
@@ -1096,9 +1102,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                ExecutorService pool = 
Executors.newFixedThreadPool( op.getNumThreads() );
                                ArrayList<UnaryAggregateTask> tasks = new 
ArrayList<UnaryAggregateTask>();
                                if( op.indexFn instanceof ReduceCol && 
grpParts.length > 0 ) {
-                                       int seqsz = 
BitmapEncoder.BITMAP_BLOCK_SZ;
-                                       int blklen = 
(int)(Math.ceil((double)rlen/op.getNumThreads()));
-                                       blklen += (blklen%seqsz != 
0)?seqsz-blklen%seqsz:0;
+                                       int blklen = 
BitmapEncoder.getAlignedBlocksize(
+                                               
(int)(Math.ceil((double)rlen/op.getNumThreads())));
                                        for( int i=0; i<op.getNumThreads() & 
i*blklen<rlen; i++ )
                                                tasks.add(new 
UnaryAggregateTask(grpParts[0], ret, i*blklen, Math.min((i+1)*blklen,rlen), 
op));
                                }
@@ -1351,9 +1356,8 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        //compute remaining compressed column groups in parallel
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        int rlen = getNumRows();
-                       int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
-                       int blklen = (int)(Math.ceil((double)rlen/k));
-                       blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+                       int blklen = BitmapEncoder.getAlignedBlocksize(
+                               (int)(Math.ceil((double)rlen/k)));
                        ArrayList<RightMatrixMultTask> tasks = new 
ArrayList<RightMatrixMultTask>();
                        for( int i=0; i<k & i*blklen<getNumRows(); i++ )
                                tasks.add(new RightMatrixMultTask(_colGroups, 
vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 233350a..ac66241 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -783,7 +783,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( !sparse || sparseBlock==null )
                        return;         
                for( int i=rl; i<ru; i++ )
-                       sparseBlock.sort(i);
+                       if( !sparseBlock.isEmpty(i) )
+                               sparseBlock.sort(i);
        }
        
        /**

Reply via email to