Repository: systemml
Updated Branches:
  refs/heads/master ce240af57 -> 2610a79d2


[SYSTEMML-1845] Performance codegen cellwise w/ multiple sparse inputs 

This patch makes a couple of compiler and runtime changes to
significantly improve performance of cellwise codegen operations with
multiple sparse inputs. This includes:

(1) Allow cellwise binary operations over multiple sparse inputs in
opening and fuse conditions of cellwise templates.

(2) Improved determination of sparse-safeness properties of cellwise
operations.

(3) Sparse iterator side inputs: In order to avoid unnecessary binary
search on lookups over sparse side inputs, we now use special iterators
that maintain the current position and only advanced if necessary, with
potential skips over entire blocks of rows.

Together these changes led to significant performance improvements. For
example, on a scenario of 20 iterations of sum(X*Y*Z), where X, Y, and Z
are sparse 1M x 1K, sp=0.1 matrices, the baselines (w/o and w/ existing
fused operators) took 90s and 7.4s whereas, the codegen operations took
529s. With the three changes above, the performance improved to 82s,
11.7s, and 5.9s respectively.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2610a79d
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2610a79d
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2610a79d

Branch: refs/heads/master
Commit: 2610a79d22c69e68a21e7098767cf27024d549b8
Parents: ce240af
Author: Matthias Boehm <[email protected]>
Authored: Wed Aug 16 18:41:03 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Aug 16 19:49:28 2017 -0700

----------------------------------------------------------------------
 .../hops/codegen/template/TemplateCell.java     |  11 +-
 .../hops/codegen/template/TemplateUtils.java    |  10 +
 .../sysml/runtime/codegen/SpoofCellwise.java    | 241 ++++++++++---------
 .../runtime/codegen/SpoofMultiAggregate.java    |   8 +-
 .../sysml/runtime/codegen/SpoofOperator.java    |  73 +++++-
 5 files changed, 207 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java 
b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java
index c9d97b9..65bad08 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java
@@ -135,7 +135,8 @@ public class TemplateCell extends TemplateBase
                tpl.setCellType(TemplateUtils.getCellType(hop));
                tpl.setAggOp(TemplateUtils.getAggOp(hop));
                tpl.setSparseSafe((HopRewriteUtils.isBinary(hop, OpOp2.MULT) && 
hop.getInput().contains(sinHops[0]))
-                               || (HopRewriteUtils.isBinary(hop, OpOp2.DIV) && 
hop.getInput().get(0) == sinHops[0]));
+                               || (HopRewriteUtils.isBinary(hop, OpOp2.DIV) && 
hop.getInput().get(0) == sinHops[0])
+                               || TemplateUtils.rIsBinaryOnly(tpl.getOutput(), 
BinType.MULT));
                tpl.setRequiresCastDtm(hop instanceof AggBinaryOp);
                tpl.setBeginLine(hop.getBeginLine());
                
@@ -279,7 +280,7 @@ public class TemplateCell extends TemplateBase
                //prepare indicators for binary operations
                boolean isBinaryMatrixScalar = false;
                boolean isBinaryMatrixVector = false;
-               boolean isBinaryMatrixMatrixDense = false;
+               boolean isBinaryMatrixMatrix = false;
                if( hop instanceof BinaryOp && hop.getDataType().isMatrix() ) {
                        Hop left = hop.getInput().get(0);
                        Hop right = hop.getInput().get(1);
@@ -290,8 +291,8 @@ public class TemplateCell extends TemplateBase
                        isBinaryMatrixVector = hop.dimsKnown() 
                                && ((ldt.isMatrix() && 
TemplateUtils.isVectorOrScalar(right)) 
                                || (rdt.isMatrix() && 
TemplateUtils.isVectorOrScalar(left)) );
-                       isBinaryMatrixMatrixDense = hop.dimsKnown() && 
HopRewriteUtils.isEqualSize(left, right)
-                               && ldt.isMatrix() && rdt.isMatrix() && 
!HopRewriteUtils.isSparse(left) && !HopRewriteUtils.isSparse(right);
+                       isBinaryMatrixMatrix = hop.dimsKnown() && 
HopRewriteUtils.isEqualSize(left, right)
+                               && ldt.isMatrix() && rdt.isMatrix();
                }
                                
                //prepare indicators for ternary operations
@@ -309,7 +310,7 @@ public class TemplateCell extends TemplateBase
                
                //check supported unary, binary, ternary operations
                return hop.getDataType() == DataType.MATRIX && 
TemplateUtils.isOperationSupported(hop) && (hop instanceof UnaryOp 
-                               || isBinaryMatrixScalar || isBinaryMatrixVector 
|| isBinaryMatrixMatrixDense 
+                               || isBinaryMatrixScalar || isBinaryMatrixVector 
|| isBinaryMatrixMatrix
                                || isTernaryVectorScalarVector || 
isTernaryMatrixScalarMatrixDense
                                || (hop instanceof ParameterizedBuiltinOp && 
((ParameterizedBuiltinOp)hop).getOp()==ParamBuiltinOp.REPLACE));   
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java 
b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
index 21061f2..5b739ee 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java
@@ -247,6 +247,16 @@ public class TemplateUtils
                        && ArrayUtils.contains(types, 
((CNodeBinary)node).getType());
        }
        
+       public static boolean rIsBinaryOnly(CNode node, BinType...types) {
+               if( !(isBinary(node, types) || node instanceof CNodeData 
+                       || (node instanceof CNodeUnary && 
((CNodeUnary)node).getType().isScalarLookup())) )
+                       return false;
+               boolean ret = true;
+               for( CNode c : node.getInput() )
+                       ret &= rIsBinaryOnly(c, types);
+               return ret;
+       }
+       
        public static boolean isTernary(CNode node, TernaryType...types) {
                return node instanceof CNodeTernary
                        && ArrayUtils.contains(types, 
((CNodeTernary)node).getType());

http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
index 25cafe7..63168e6 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
@@ -106,7 +106,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                                + "aggregation type: 
"+_aggOp.name());
                }
        }
-               
+       
        @Override
        public ScalarObject execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, int k) 
                throws DMLRuntimeException 
@@ -125,8 +125,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                final int m = inputs.get(0).getNumRows();
                final int n = inputs.get(0).getNumColumns();
                
-               //sparse safe check 
-               boolean sparseSafe = isSparseSafe() || (b.length == 0 
+               //sparse safe check
+               boolean sparseSafe = isSparseSafe() || (b.length == 0
                                && genexec( 0, b, scalars, m, n, 0, 0 ) == 0);
                
                double ret = 0;
@@ -147,11 +147,11 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                int nk = 
UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
                                int blklen = (int)(Math.ceil((double)m/nk));
                                for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new ParAggTask(inputs.get(0), 
b, scalars, m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); 
+                                       tasks.add(new ParAggTask(inputs.get(0), 
b, scalars, m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m)));
                                //execute tasks
-                               List<Future<Double>> taskret = 
pool.invokeAll(tasks);   
+                               List<Future<Double>> taskret = 
pool.invokeAll(tasks);
                                pool.shutdown();
-                       
+                               
                                //aggregate partial results
                                ValueFunction vfun = getAggFunction();
                                if( vfun instanceof KahanFunction ) {
@@ -159,7 +159,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                        KahanPlus kplus = 
KahanPlus.getKahanPlusFnObject();
                                        for( Future<Double> task : taskret )
                                                kplus.execute2(kbuff, 
task.get());
-                                       ret = kbuff._sum;       
+                                       ret = kbuff._sum;
                                }
                                else {
                                        for( Future<Double> task : taskret )
@@ -180,14 +180,14 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
        }
 
        @Override
-       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out) 
+       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out)
                throws DMLRuntimeException
        {
                execute(inputs, scalarObjects, out, 1);
        }
        
        @Override
-       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k)       
+       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k)
                throws DMLRuntimeException
        {
                //sanity check
@@ -242,9 +242,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                        blklen = 
BitmapEncoder.getAlignedBlocksize(blklen);
                                for( int i=0; i<nk & i*blklen<m; i++ )
                                        tasks.add(new ParExecTask(a, b, 
scalars, out, m, n,
-                                               sparseSafe, i*blklen, 
Math.min((i+1)*blklen, m))); 
+                                               sparseSafe, i*blklen, 
Math.min((i+1)*blklen, m)));
                                //execute tasks
-                               List<Future<Long>> taskret = 
pool.invokeAll(tasks);     
+                               List<Future<Long>> taskret = 
pool.invokeAll(tasks);
                                pool.shutdown();
                                
                                //aggregate nnz and error handling
@@ -286,21 +286,22 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                throws DMLRuntimeException 
        {
                double[] c = out.getDenseBlock();
+               SideInput[] lb = createSparseSideInputs(b);
                
                if( _type == CellType.NO_AGG ) {
-                       return executeDenseNoAgg(a, b, scalars, c, m, n, 
sparseSafe, rl, ru);
+                       return executeDenseNoAgg(a, lb, scalars, c, m, n, 
sparseSafe, rl, ru);
                }
                else if( _type == CellType.ROW_AGG ) {
                        if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                               return executeDenseRowAggSum(a, b, scalars, c, 
m, n, sparseSafe, rl, ru);
+                               return executeDenseRowAggSum(a, lb, scalars, c, 
m, n, sparseSafe, rl, ru);
                        else
-                               return executeDenseRowAggMxx(a, b, scalars, c, 
m, n, sparseSafe, rl, ru);
+                               return executeDenseRowAggMxx(a, lb, scalars, c, 
m, n, sparseSafe, rl, ru);
                }
                else if( _type == CellType.COL_AGG ) {
                        if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                               return executeDenseColAggSum(a, b, scalars, c, 
m, n, sparseSafe, rl, ru);
+                               return executeDenseColAggSum(a, lb, scalars, c, 
m, n, sparseSafe, rl, ru);
                        else
-                               return executeDenseColAggMxx(a, b, scalars, c, 
m, n, sparseSafe, rl, ru);
+                               return executeDenseColAggMxx(a, lb, scalars, c, 
m, n, sparseSafe, rl, ru);
                }
                return -1;
        }
@@ -308,58 +309,62 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
        private double executeDenseAndAgg(double[] a, SideInput[] b, double[] 
scalars, 
                        int m, int n, boolean sparseSafe, int rl, int ru) 
throws DMLRuntimeException 
        {
+               SideInput[] lb = createSparseSideInputs(b);
+               
                //numerically stable aggregation for sum/sum_sq
                if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                       return executeDenseAggSum(a, b, scalars, m, n, 
sparseSafe, rl, ru);
+                       return executeDenseAggSum(a, lb, scalars, m, n, 
sparseSafe, rl, ru);
                else
-                       return executeDenseAggMxx(a, b, scalars, m, n, 
sparseSafe, rl, ru);
+                       return executeDenseAggMxx(a, lb, scalars, m, n, 
sparseSafe, rl, ru);
        }
        
-       private long executeSparse(SparseBlock sblock, SideInput[] b, double[] 
scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparse(SparseBlock sblock, SideInput[] b, double[] 
scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                if( sparseSafe && sblock == null )
                        return 0;
                
+               SideInput[] lb = createSparseSideInputs(b);
                if( _type == CellType.NO_AGG ) {
-                       if( out.isInSparseFormat() ) 
-                               return executeSparseNoAggSparse(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                       if( out.isInSparseFormat() )
+                               return executeSparseNoAggSparse(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                        else
-                               return executeSparseNoAggDense(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                               return executeSparseNoAggDense(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                }
                else if( _type == CellType.ROW_AGG ) {
                        if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                               return executeSparseRowAggSum(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                               return executeSparseRowAggSum(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                        else
-                               return executeSparseRowAggMxx(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                               return executeSparseRowAggMxx(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                }
                else if( _type == CellType.COL_AGG ) {
                        if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                               return executeSparseColAggSum(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                               return executeSparseColAggSum(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                        else
-                               return executeSparseColAggMxx(sblock, b, 
scalars, out, m, n, sparseSafe, rl, ru);
+                               return executeSparseColAggMxx(sblock, lb, 
scalars, out, m, n, sparseSafe, rl, ru);
                }
                
                return -1;
        }
        
-       private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
-               throws DMLRuntimeException 
+       private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
+               throws DMLRuntimeException
        {
                if( sparseSafe && sblock == null )
                        return 0;
                
+               SideInput[] lb = createSparseSideInputs(b);
                if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
-                       return executeSparseAggSum(sblock, b, scalars, m, n, 
sparseSafe, rl, ru);
+                       return executeSparseAggSum(sblock, lb, scalars, m, n, 
sparseSafe, rl, ru);
                else
-                       return executeSparseAggMxx(sblock, b, scalars, m, n, 
sparseSafe, rl, ru);
+                       return executeSparseAggMxx(sblock, lb, scalars, m, n, 
sparseSafe, rl, ru);
        }
        
-       private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                if( _type == CellType.NO_AGG ) {
                        long lnnz = executeCompressedNoAgg(a, b, scalars, out, 
m, n, sparseSafe, rl, ru);
@@ -384,7 +389,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeCompressedAndAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
+       private double executeCompressedAndAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
                        int m, int n, boolean sparseSafe, int rl, int ru) 
throws DMLRuntimeException 
        {
                //numerically stable aggregation for sum/sum_sq
@@ -397,24 +402,24 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
        /////////
        //core operator skeletons for dense, sparse, and compressed
 
-       private long executeDenseNoAgg(double[] a, SideInput[] b, double[] 
scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
-               throws DMLRuntimeException 
+       private long executeDenseNoAgg(double[] a, SideInput[] b, double[] 
scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
+               throws DMLRuntimeException
        {
                long lnnz = 0;
-               for( int i=rl, ix=rl*n; i<ru; i++ ) 
+               for( int i=rl, ix=rl*n; i<ru; i++ )
                        for( int j=0; j<n; j++, ix++ ) {
                                double aval = (a != null) ? a[ix] : 0;
                                if( aval != 0 || !sparseSafe) {
-                                       c[ix] = genexec( aval, b, scalars, m, 
n, i, j); 
+                                       c[ix] = genexec( aval, b, scalars, m, 
n, i, j);
                                        lnnz += (c[ix]!=0) ? 1 : 0;
                                }
                        }
                return lnnz;
        }
        
-       private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] 
scalars, 
-               double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] 
scalars,
+               double[] c, int m, int n, boolean sparseSafe, int rl, int ru)
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
@@ -431,15 +436,15 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] 
scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] 
scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
                throws DMLRuntimeException 
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
                ValueFunction vfun = getAggFunction();
                long lnnz = 0;
                if( a == null && !sparseSafe ) { //empty
-                       for( int i=rl; i<ru; i++ ) { 
+                       for( int i=rl; i<ru; i++ ) {
                                double tmp = initialVal;
                                for( int j=0; j<n; j++ )
                                        tmp = vfun.execute(tmp, genexec(0, b, 
scalars, m, n, i, j));
@@ -460,8 +465,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeDenseColAggSum(double[] a, SideInput[] b, double[] 
scalars, 
-               double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeDenseColAggSum(double[] a, SideInput[] b, double[] 
scalars,
+               double[] c, int m, int n, boolean sparseSafe, int rl, int ru)
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
@@ -480,8 +485,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private long executeDenseColAggMxx(double[] a, SideInput[] b, double[] 
scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
+       private long executeDenseColAggMxx(double[] a, SideInput[] b, double[] 
scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
                throws DMLRuntimeException 
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
@@ -509,8 +514,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeDenseAggSum(double[] a, SideInput[] b, double[] 
scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeDenseAggSum(double[] a, SideInput[] b, double[] 
scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -525,8 +530,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return kbuff._sum;
        }
        
-       private double executeDenseAggMxx(double[] a, SideInput[] b, double[] 
scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
+       private double executeDenseAggMxx(double[] a, SideInput[] b, double[] 
scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
                throws DMLRuntimeException 
        {
                //safe aggregation for min/max w/ handling of zero entries
@@ -543,12 +548,12 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] 
b, double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] 
b, double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                SparseBlock c = out.getSparseBlock();
                long lnnz = 0;
                for(int i=rl; i<ru; i++) {
@@ -580,12 +585,12 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                double[] c = out.getDenseBlock();
                long lnnz = 0;
                for(int i=rl, cix=rl*n; i<ru; i++, cix+=n) {
@@ -614,15 +619,15 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
+       private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
                throws DMLRuntimeException 
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
 
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                double[] c = out.getDenseBlock();
                long lnnz = 0;
                for(int i=rl; i<ru; i++) {
@@ -653,9 +658,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
                ValueFunction vfun = getAggFunction();
@@ -692,16 +697,16 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
 
-       private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
                double[] corr = new double[n];
                
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                double[] c = out.getDenseBlock();
                for(int i=rl; i<ru; i++) {
                        kbuff.set(0, 0);
@@ -741,9 +746,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
                ValueFunction vfun = getAggFunction();
@@ -751,8 +756,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                Arrays.fill(c, initialVal);
                int[] count = new int[n];
                
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                for(int i=rl; i<ru; i++) {
                        int lastj = -1;
                        //handle non-empty rows
@@ -783,13 +788,13 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
-               throws DMLRuntimeException 
+       private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
+               throws DMLRuntimeException
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
-
+               
                //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
                //in order to avoid binary search for sparse-unsafe 
                for(int i=rl; i<ru; i++) {
@@ -818,16 +823,16 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return kbuff._sum;
        }
        
-       private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
-               throws DMLRuntimeException 
+       private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, 
double[] scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
+               throws DMLRuntimeException
        {
                double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
                ret = (sparseSafe && sblock.size() < (long)m*n) ? 0 : ret;
                ValueFunction vfun = getAggFunction();
                
-               //note: sequential scan algorithm for both sparse-safe and 
-unsafe 
-               //in order to avoid binary search for sparse-unsafe 
+               //note: sequential scan algorithm for both sparse-safe and 
-unsafe
+               //in order to avoid binary search for sparse-unsafe
                for(int i=rl; i<ru; i++) {
                        int lastj = -1;
                        //handle non-empty rows
@@ -854,15 +859,15 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       private long executeCompressedNoAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru) 
-               throws DMLRuntimeException 
+       private long executeCompressedNoAgg(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       MatrixBlock out, int m, int n, boolean sparseSafe, int 
rl, int ru)
+               throws DMLRuntimeException
        {
                double[] c = out.getDenseBlock();
                SparseBlock csblock = out.getSparseBlock();
                
                //preallocate sparse rows to avoid reallocations
-               //note: counting nnz requires segment-aligned boundaries, which 
is enforced 
+               //note: counting nnz requires segment-aligned boundaries, which 
is enforced
                //whenever k/2 * BITMAP_BLOCK_SZ > m (i.e., it does not limit 
parallelism)
                if( out.isInSparseFormat() && 
rl%BitmapEncoder.BITMAP_BLOCK_SZ==0
                        && ru%BitmapEncoder.BITMAP_BLOCK_SZ==0) {
@@ -881,15 +886,15 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                                csblock.append(cell.getI(), cell.getJ(), val);
                        }
                        else
-                               c[cell.getI()*n+cell.getJ()] = val; 
+                               c[cell.getI()*n+cell.getJ()] = val;
                        lnnz += (val!=0) ? 1 : 0;
                }
                return lnnz;
        }
        
-       private long executeCompressedRowAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
-               throws DMLRuntimeException 
+       private long executeCompressedRowAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
+               throws DMLRuntimeException
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
@@ -907,9 +912,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeCompressedRowAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
-               throws DMLRuntimeException 
+       private long executeCompressedRowAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
+               throws DMLRuntimeException
        {
                Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE);
                ValueFunction vfun = getAggFunction();
@@ -925,8 +930,8 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private long executeCompressedColAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-               double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+       private long executeCompressedColAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+               double[] c, int m, int n, boolean sparseSafe, int rl, int ru)
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
@@ -944,9 +949,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return -1;
        }
        
-       private long executeCompressedColAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru) 
-               throws DMLRuntimeException 
+       private long executeCompressedColAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       double[] c, int m, int n, boolean sparseSafe, int rl, 
int ru)
+               throws DMLRuntimeException
        {
                Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE);
                ValueFunction vfun = getAggFunction();
@@ -962,9 +967,9 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return lnnz;
        }
        
-       private double executeCompressedAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
-               throws DMLRuntimeException 
+       private double executeCompressedAggSum(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
+               throws DMLRuntimeException
        {
                KahanFunction kplus = (KahanFunction) getAggFunction();
                KahanObject kbuff = new KahanObject(0, 0);
@@ -972,19 +977,19 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
                while( iter.hasNext() ) {
                        IJV cell = iter.next();
-                       double val = genexec(cell.getV(), b, scalars, m, n, 
cell.getI(), cell.getJ()); 
+                       double val = genexec(cell.getV(), b, scalars, m, n, 
cell.getI(), cell.getJ());
                        kplus.execute2(kbuff, val);
                }
                return kbuff._sum;
        }
        
-       private double executeCompressedAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars, 
-                       int m, int n, boolean sparseSafe, int rl, int ru) 
-               throws DMLRuntimeException 
+       private double executeCompressedAggMxx(CompressedMatrixBlock a, 
SideInput[] b, double[] scalars,
+                       int m, int n, boolean sparseSafe, int rl, int ru)
+               throws DMLRuntimeException
        {
                //safe aggregation for min/max w/ handling of zero entries
                //note: sparse safe with zero value as min/max handled outside
-               double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE; 
+               double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : 
-Double.MAX_VALUE;
                ValueFunction vfun = getAggFunction();
                
                Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
@@ -996,7 +1001,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                return ret;
        }
        
-       protected abstract double genexec( double a, SideInput[] b, 
+       protected abstract double genexec( double a, SideInput[] b,
                        double[] scalars, int m, int n, int rowIndex, int 
colIndex);
        
        private class ParAggTask implements Callable<Double> 
@@ -1033,7 +1038,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                }
        }
 
-       private class ParExecTask implements Callable<Long> 
+       private class ParExecTask implements Callable<Long>
        {
                private final MatrixBlock _a;
                private final SideInput[] _b;
@@ -1045,7 +1050,7 @@ public abstract class SpoofCellwise extends SpoofOperator 
implements Serializabl
                private final int _rl;
                private final int _ru;
 
-               protected ParExecTask( MatrixBlock a, SideInput[] b, double[] 
scalars, MatrixBlock c, 
+               protected ParExecTask( MatrixBlock a, SideInput[] b, double[] 
scalars, MatrixBlock c,
                                int rlen, int clen, boolean sparseSafe, int rl, 
int ru ) {
                        _a = a;
                        _b = b;

http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
index c3755d4..679c964 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java
@@ -136,11 +136,13 @@ public abstract class SpoofMultiAggregate extends 
SpoofOperator implements Seria
        
        private void executeDense(double[] a, SideInput[] b, double[] scalars, 
double[] c, int m, int n, int rl, int ru) throws DMLRuntimeException 
        {
+               SideInput[] lb = createSparseSideInputs(b);
+               
                //core dense aggregation operation
                for( int i=rl, ix=rl*n; i<ru; i++ ) { 
                        for( int j=0; j<n; j++, ix++ ) {
                                double in = (a != null) ? a[ix] : 0;
-                               genexec( in, b, scalars, c, m, n, i, j );
+                               genexec( in, lb, scalars, c, m, n, i, j );
                        }
                }
        }
@@ -148,11 +150,13 @@ public abstract class SpoofMultiAggregate extends 
SpoofOperator implements Seria
        private void executeSparse(SparseBlock sblock, SideInput[] b, double[] 
scalars, double[] c, int m, int n, int rl, int ru) 
                throws DMLRuntimeException 
        {
+               SideInput[] lb = createSparseSideInputs(b);
+               
                //core dense aggregation operation
                for( int i=rl; i<ru; i++ )
                        for( int j=0; j<n; j++ ) {
                                double in = (sblock != null) ? sblock.get(i, j) 
: 0;
-                               genexec( in, b, scalars, c, m, n, i, j );
+                               genexec( in, lb, scalars, c, m, n, i, j );
                        }
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
index cff640c..fe32839 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
@@ -37,24 +37,24 @@ public abstract class SpoofOperator implements Serializable
        private static final long serialVersionUID = 3834006998853573319L;
        private static final Log LOG = 
LogFactory.getLog(SpoofOperator.class.getName());
        
-       public abstract void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, MatrixBlock out) 
+       public abstract void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, MatrixBlock out)
                throws DMLRuntimeException;
        
-       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, MatrixBlock out, int k) 
+       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, MatrixBlock out, int k)
                throws DMLRuntimeException 
        {
                //default implementation serial execution
                execute(inputs, scalars, out);
        }
        
-       public abstract String getSpoofType(); 
+       public abstract String getSpoofType();
        
        public ScalarObject execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars) throws DMLRuntimeException {
                throw new RuntimeException("Invalid invocation in base class.");
        }
        
-       public ScalarObject execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, int k) 
-               throws DMLRuntimeException 
+       public ScalarObject execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalars, int k)
+               throws DMLRuntimeException
        {
                //default implementation serial execution
                return execute(inputs, scalars);
@@ -76,13 +76,13 @@ public abstract class SpoofOperator implements Serializable
                return prepInputMatrices(inputs, 1, inputs.size()-1, denseOnly, 
tB1);
        }
        
-       protected SideInput[] prepInputMatrices(ArrayList<MatrixBlock> inputs, 
int offset, int len, boolean denseOnly, boolean tB1) 
-               throws DMLRuntimeException 
+       protected SideInput[] prepInputMatrices(ArrayList<MatrixBlock> inputs, 
int offset, int len, boolean denseOnly, boolean tB1)
+               throws DMLRuntimeException
        {
-               SideInput[] b = new SideInput[len]; 
+               SideInput[] b = new SideInput[len];
                for(int i=offset; i<offset+len; i++) {
                        //decompress if necessary
-                       if( inputs.get(i) instanceof CompressedMatrixBlock ) 
+                       if( inputs.get(i) instanceof CompressedMatrixBlock )
                                inputs.set(i, 
((CompressedMatrixBlock)inputs.get(i)).decompress());
                        //transpose if necessary
                        int clen = inputs.get(i).getNumColumns();
@@ -113,6 +113,26 @@ public abstract class SpoofOperator implements Serializable
                return b;
        }
        
+       protected SideInput[] createSparseSideInputs(SideInput[] input) {
+               //determine if there are sparse side inputs
+               boolean containsSparse = false;
+               for( int i=0; i<input.length; i++ ) {
+                       SideInput tmp = input[i];
+                       containsSparse |= (tmp.mdat != null && 
tmp.mdat.isInSparseFormat() 
+                               && !tmp.mdat.isEmptyBlock(false) && tmp.clen > 
1);
+               }
+               if( !containsSparse )
+                       return input;
+               SideInput[] ret = new SideInput[input.length];
+               for( int i=0; i<input.length; i++ ) {
+                       SideInput tmp = input[i];
+                       ret[i] = (tmp.mdat != null && 
tmp.mdat.isInSparseFormat()
+                               && !tmp.mdat.isEmptyBlock(false) && tmp.clen > 
1) ?
+                               new SideInputSparse(tmp) : tmp;
+               }
+               return ret;
+       }
+       
        public double[][] getDenseMatrices(SideInput[] inputs) {
                double[][] ret = new double[inputs.length][];
                for( int i=0; i<inputs.length; i++ )
@@ -121,7 +141,7 @@ public abstract class SpoofOperator implements Serializable
        }
        
        protected double[] prepInputScalars(ArrayList<ScalarObject> 
scalarObjects) {
-               double[] scalars = new double[scalarObjects.size()]; 
+               double[] scalars = new double[scalarObjects.size()];
                for(int i=0; i < scalarObjects.size(); i++)
                        scalars[i] = scalarObjects.get(i).getDoubleValue();
                return scalars;
@@ -156,7 +176,7 @@ public abstract class SpoofOperator implements Serializable
        
        protected static double getValue(SideInput data, int rowIndex) {
                //note: wrapper sideinput guaranteed to exist
-               return (data.ddat!=null) ? data.ddat[rowIndex] : 
+               return (data.ddat!=null) ? data.ddat[rowIndex] :
                        (data.mdat!=null) ? data.mdat.quickGetValue(rowIndex, 
0) : 0;
        }
        
@@ -169,6 +189,7 @@ public abstract class SpoofOperator implements Serializable
        protected static double getValue(SideInput data, int n, int rowIndex, 
int colIndex) {
                //note: wrapper sideinput guaranteed to exist
                return (data.ddat!=null) ? data.ddat[rowIndex*n+colIndex] : 
+                       (data instanceof SideInputSparse) ? 
((SideInputSparse)data).next(rowIndex, colIndex) :
                        (data.mdat!=null) ? data.mdat.quickGetValue(rowIndex, 
colIndex) : 0;
        }
        
@@ -196,4 +217,34 @@ public abstract class SpoofOperator implements Serializable
                        clen = clength;
                }
        }
+       
+       public static class SideInputSparse extends SideInput {
+               private int currRowIndex = -1;
+               private int currColPos = 0;
+               private int currLen = 0;
+               private int[] indexes;
+               private double[] values;
+               
+               public SideInputSparse(SideInput in) {
+                       super(in.ddat, in.mdat, in.clen);
+               }
+               public double next(int rowIndex, int colIndex) {
+                       if( mdat.getSparseBlock().isEmpty(rowIndex) )
+                               return 0;
+                       //move to next row if necessary
+                       if( rowIndex > currRowIndex ) {
+                               currRowIndex = rowIndex;
+                               currColPos = 
mdat.getSparseBlock().pos(currRowIndex);
+                               currLen = 
mdat.getSparseBlock().size(currRowIndex) + currColPos;
+                               indexes = 
mdat.getSparseBlock().indexes(currRowIndex);
+                               values = 
mdat.getSparseBlock().values(currRowIndex);
+                       }
+                       //move to next colpos if necessary
+                       while( currColPos < currLen && 
indexes[currColPos]<colIndex )
+                               currColPos ++;
+                       //return current value or zero
+                       return (currColPos < currLen && 
indexes[currColPos]==colIndex) ?
+                               values[currColPos] : 0;
+               }
+       }
 }

Reply via email to