Repository: systemml Updated Branches: refs/heads/master ce240af57 -> 2610a79d2
[SYSTEMML-1845] Performance codegen cellwise w/ multiple sparse inputs This patch makes a couple of compiler and runtime changes to significantly improve performance of cellwise codegen operations with multiple sparse inputs. This includes: (1) Allow cellwise binary operations over multiple sparse inputs in opening and fuse conditions of cellwise templates. (2) Improved determination of sparse-safeness properties of cellwise operations. (3) Sparse iterator side inputs: In order to avoid unnecessary binary search on lookups over sparse side inputs, we now use special iterators that maintain the current position and only advanced if necessary, with potential skips over entire blocks of rows. Together these changes led to significant performance improvements. For example, on a scenario of 20 iterations of sum(X*Y*Z), where X, Y, and Z are sparse 1M x 1K, sp=0.1 matrices, the baselines (w/o and w/ existing fused operators) took 90s and 7.4s whereas, the codegen operations took 529s. With the three changes above, the performance improved to 82s, 11.7s, and 5.9s respectively. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2610a79d Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2610a79d Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2610a79d Branch: refs/heads/master Commit: 2610a79d22c69e68a21e7098767cf27024d549b8 Parents: ce240af Author: Matthias Boehm <[email protected]> Authored: Wed Aug 16 18:41:03 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Aug 16 19:49:28 2017 -0700 ---------------------------------------------------------------------- .../hops/codegen/template/TemplateCell.java | 11 +- .../hops/codegen/template/TemplateUtils.java | 10 + .../sysml/runtime/codegen/SpoofCellwise.java | 241 ++++++++++--------- .../runtime/codegen/SpoofMultiAggregate.java | 8 +- .../sysml/runtime/codegen/SpoofOperator.java | 73 +++++- 5 files changed, 207 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java index c9d97b9..65bad08 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java +++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java @@ -135,7 +135,8 @@ public class TemplateCell extends TemplateBase tpl.setCellType(TemplateUtils.getCellType(hop)); tpl.setAggOp(TemplateUtils.getAggOp(hop)); tpl.setSparseSafe((HopRewriteUtils.isBinary(hop, OpOp2.MULT) && hop.getInput().contains(sinHops[0])) - || (HopRewriteUtils.isBinary(hop, OpOp2.DIV) && hop.getInput().get(0) == sinHops[0])); + || (HopRewriteUtils.isBinary(hop, OpOp2.DIV) && hop.getInput().get(0) == sinHops[0]) + || TemplateUtils.rIsBinaryOnly(tpl.getOutput(), BinType.MULT)); tpl.setRequiresCastDtm(hop instanceof AggBinaryOp); tpl.setBeginLine(hop.getBeginLine()); @@ -279,7 +280,7 @@ public class TemplateCell extends TemplateBase //prepare indicators for binary operations boolean isBinaryMatrixScalar = false; boolean isBinaryMatrixVector = false; - boolean isBinaryMatrixMatrixDense = false; + boolean isBinaryMatrixMatrix = false; if( hop instanceof BinaryOp && hop.getDataType().isMatrix() ) { Hop left = hop.getInput().get(0); Hop right = hop.getInput().get(1); @@ -290,8 +291,8 @@ public class TemplateCell extends TemplateBase isBinaryMatrixVector = hop.dimsKnown() && ((ldt.isMatrix() && TemplateUtils.isVectorOrScalar(right)) || (rdt.isMatrix() && TemplateUtils.isVectorOrScalar(left)) ); - isBinaryMatrixMatrixDense = hop.dimsKnown() && HopRewriteUtils.isEqualSize(left, right) - && ldt.isMatrix() && rdt.isMatrix() && !HopRewriteUtils.isSparse(left) && !HopRewriteUtils.isSparse(right); + isBinaryMatrixMatrix = hop.dimsKnown() && HopRewriteUtils.isEqualSize(left, right) + && ldt.isMatrix() && rdt.isMatrix(); } //prepare indicators for ternary operations @@ -309,7 +310,7 @@ public class TemplateCell extends TemplateBase //check supported unary, binary, ternary operations return hop.getDataType() == DataType.MATRIX && TemplateUtils.isOperationSupported(hop) && (hop instanceof UnaryOp - || isBinaryMatrixScalar || isBinaryMatrixVector || isBinaryMatrixMatrixDense + || isBinaryMatrixScalar || isBinaryMatrixVector || isBinaryMatrixMatrix || isTernaryVectorScalarVector || isTernaryMatrixScalarMatrixDense || (hop instanceof ParameterizedBuiltinOp && ((ParameterizedBuiltinOp)hop).getOp()==ParamBuiltinOp.REPLACE)); } http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java index 21061f2..5b739ee 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java +++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java @@ -247,6 +247,16 @@ public class TemplateUtils && ArrayUtils.contains(types, ((CNodeBinary)node).getType()); } + public static boolean rIsBinaryOnly(CNode node, BinType...types) { + if( !(isBinary(node, types) || node instanceof CNodeData + || (node instanceof CNodeUnary && ((CNodeUnary)node).getType().isScalarLookup())) ) + return false; + boolean ret = true; + for( CNode c : node.getInput() ) + ret &= rIsBinaryOnly(c, types); + return ret; + } + public static boolean isTernary(CNode node, TernaryType...types) { return node instanceof CNodeTernary && ArrayUtils.contains(types, ((CNodeTernary)node).getType()); http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java index 25cafe7..63168e6 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java @@ -106,7 +106,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl + "aggregation type: "+_aggOp.name()); } } - + @Override public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, int k) throws DMLRuntimeException @@ -125,8 +125,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl final int m = inputs.get(0).getNumRows(); final int n = inputs.get(0).getNumColumns(); - //sparse safe check - boolean sparseSafe = isSparseSafe() || (b.length == 0 + //sparse safe check + boolean sparseSafe = isSparseSafe() || (b.length == 0 && genexec( 0, b, scalars, m, n, 0, 0 ) == 0); double ret = 0; @@ -147,11 +147,11 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); int blklen = (int)(Math.ceil((double)m/nk)); for( int i=0; i<nk & i*blklen<m; i++ ) - tasks.add(new ParAggTask(inputs.get(0), b, scalars, m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); + tasks.add(new ParAggTask(inputs.get(0), b, scalars, m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); //execute tasks - List<Future<Double>> taskret = pool.invokeAll(tasks); + List<Future<Double>> taskret = pool.invokeAll(tasks); pool.shutdown(); - + //aggregate partial results ValueFunction vfun = getAggFunction(); if( vfun instanceof KahanFunction ) { @@ -159,7 +159,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); for( Future<Double> task : taskret ) kplus.execute2(kbuff, task.get()); - ret = kbuff._sum; + ret = kbuff._sum; } else { for( Future<Double> task : taskret ) @@ -180,14 +180,14 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl } @Override - public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) throws DMLRuntimeException { execute(inputs, scalarObjects, out, 1); } @Override - public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) throws DMLRuntimeException { //sanity check @@ -242,9 +242,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl blklen = BitmapEncoder.getAlignedBlocksize(blklen); for( int i=0; i<nk & i*blklen<m; i++ ) tasks.add(new ParExecTask(a, b, scalars, out, m, n, - sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); + sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); //execute tasks - List<Future<Long>> taskret = pool.invokeAll(tasks); + List<Future<Long>> taskret = pool.invokeAll(tasks); pool.shutdown(); //aggregate nnz and error handling @@ -286,21 +286,22 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl throws DMLRuntimeException { double[] c = out.getDenseBlock(); + SideInput[] lb = createSparseSideInputs(b); if( _type == CellType.NO_AGG ) { - return executeDenseNoAgg(a, b, scalars, c, m, n, sparseSafe, rl, ru); + return executeDenseNoAgg(a, lb, scalars, c, m, n, sparseSafe, rl, ru); } else if( _type == CellType.ROW_AGG ) { if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeDenseRowAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru); + return executeDenseRowAggSum(a, lb, scalars, c, m, n, sparseSafe, rl, ru); else - return executeDenseRowAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); + return executeDenseRowAggMxx(a, lb, scalars, c, m, n, sparseSafe, rl, ru); } else if( _type == CellType.COL_AGG ) { if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeDenseColAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru); + return executeDenseColAggSum(a, lb, scalars, c, m, n, sparseSafe, rl, ru); else - return executeDenseColAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); + return executeDenseColAggMxx(a, lb, scalars, c, m, n, sparseSafe, rl, ru); } return -1; } @@ -308,58 +309,62 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl private double executeDenseAndAgg(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { + SideInput[] lb = createSparseSideInputs(b); + //numerically stable aggregation for sum/sum_sq if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeDenseAggSum(a, b, scalars, m, n, sparseSafe, rl, ru); + return executeDenseAggSum(a, lb, scalars, m, n, sparseSafe, rl, ru); else - return executeDenseAggMxx(a, b, scalars, m, n, sparseSafe, rl, ru); + return executeDenseAggMxx(a, lb, scalars, m, n, sparseSafe, rl, ru); } - private long executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { if( sparseSafe && sblock == null ) return 0; + SideInput[] lb = createSparseSideInputs(b); if( _type == CellType.NO_AGG ) { - if( out.isInSparseFormat() ) - return executeSparseNoAggSparse(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + if( out.isInSparseFormat() ) + return executeSparseNoAggSparse(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); else - return executeSparseNoAggDense(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + return executeSparseNoAggDense(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); } else if( _type == CellType.ROW_AGG ) { if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeSparseRowAggSum(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + return executeSparseRowAggSum(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); else - return executeSparseRowAggMxx(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + return executeSparseRowAggMxx(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); } else if( _type == CellType.COL_AGG ) { if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeSparseColAggSum(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + return executeSparseColAggSum(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); else - return executeSparseColAggMxx(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + return executeSparseColAggMxx(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru); } return -1; } - private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { if( sparseSafe && sblock == null ) return 0; + SideInput[] lb = createSparseSideInputs(b); if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) - return executeSparseAggSum(sblock, b, scalars, m, n, sparseSafe, rl, ru); + return executeSparseAggSum(sblock, lb, scalars, m, n, sparseSafe, rl, ru); else - return executeSparseAggMxx(sblock, b, scalars, m, n, sparseSafe, rl, ru); + return executeSparseAggMxx(sblock, lb, scalars, m, n, sparseSafe, rl, ru); } - private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { if( _type == CellType.NO_AGG ) { long lnnz = executeCompressedNoAgg(a, b, scalars, out, m, n, sparseSafe, rl, ru); @@ -384,7 +389,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private double executeCompressedAndAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + private double executeCompressedAndAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { //numerically stable aggregation for sum/sum_sq @@ -397,24 +402,24 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl ///////// //core operator skeletons for dense, sparse, and compressed - private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { long lnnz = 0; - for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int i=rl, ix=rl*n; i<ru; i++ ) for( int j=0; j<n; j++, ix++ ) { double aval = (a != null) ? a[ix] : 0; if( aval != 0 || !sparseSafe) { - c[ix] = genexec( aval, b, scalars, m, n, i, j); + c[ix] = genexec( aval, b, scalars, m, n, i, j); lnnz += (c[ix]!=0) ? 1 : 0; } } return lnnz; } - private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -431,15 +436,15 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; ValueFunction vfun = getAggFunction(); long lnnz = 0; if( a == null && !sparseSafe ) { //empty - for( int i=rl; i<ru; i++ ) { + for( int i=rl; i<ru; i++ ) { double tmp = initialVal; for( int j=0; j<n; j++ ) tmp = vfun.execute(tmp, genexec(0, b, scalars, m, n, i, j)); @@ -460,8 +465,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeDenseColAggSum(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeDenseColAggSum(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -480,8 +485,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private long executeDenseColAggMxx(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeDenseColAggMxx(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; @@ -509,8 +514,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) + private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); @@ -525,8 +530,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return kbuff._sum; } - private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) + private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { //safe aggregation for min/max w/ handling of zero entries @@ -543,12 +548,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return ret; } - private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe SparseBlock c = out.getSparseBlock(); long lnnz = 0; for(int i=rl; i<ru; i++) { @@ -580,12 +585,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe double[] c = out.getDenseBlock(); long lnnz = 0; for(int i=rl, cix=rl*n; i<ru; i++, cix+=n) { @@ -614,15 +619,15 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe double[] c = out.getDenseBlock(); long lnnz = 0; for(int i=rl; i<ru; i++) { @@ -653,9 +658,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; ValueFunction vfun = getAggFunction(); @@ -692,16 +697,16 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); double[] corr = new double[n]; - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe double[] c = out.getDenseBlock(); for(int i=rl; i<ru; i++) { kbuff.set(0, 0); @@ -741,9 +746,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; ValueFunction vfun = getAggFunction(); @@ -751,8 +756,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl Arrays.fill(c, initialVal); int[] count = new int[n]; - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe for(int i=rl; i<ru; i++) { int lastj = -1; //handle non-empty rows @@ -783,13 +788,13 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); - + //note: sequential scan algorithm for both sparse-safe and -unsafe //in order to avoid binary search for sparse-unsafe for(int i=rl; i<ru; i++) { @@ -818,16 +823,16 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return kbuff._sum; } - private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; ret = (sparseSafe && sblock.size() < (long)m*n) ? 0 : ret; ValueFunction vfun = getAggFunction(); - //note: sequential scan algorithm for both sparse-safe and -unsafe - //in order to avoid binary search for sparse-unsafe + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe for(int i=rl; i<ru; i++) { int lastj = -1; //handle non-empty rows @@ -854,15 +859,15 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return ret; } - private long executeCompressedNoAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeCompressedNoAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { double[] c = out.getDenseBlock(); SparseBlock csblock = out.getSparseBlock(); //preallocate sparse rows to avoid reallocations - //note: counting nnz requires segment-aligned boundaries, which is enforced + //note: counting nnz requires segment-aligned boundaries, which is enforced //whenever k/2 * BITMAP_BLOCK_SZ > m (i.e., it does not limit parallelism) if( out.isInSparseFormat() && rl%BitmapEncoder.BITMAP_BLOCK_SZ==0 && ru%BitmapEncoder.BITMAP_BLOCK_SZ==0) { @@ -881,15 +886,15 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl csblock.append(cell.getI(), cell.getJ(), val); } else - c[cell.getI()*n+cell.getJ()] = val; + c[cell.getI()*n+cell.getJ()] = val; lnnz += (val!=0) ? 1 : 0; } return lnnz; } - private long executeCompressedRowAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeCompressedRowAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -907,9 +912,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeCompressedRowAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeCompressedRowAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE); ValueFunction vfun = getAggFunction(); @@ -925,8 +930,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private long executeCompressedColAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + private long executeCompressedColAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -944,9 +949,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return -1; } - private long executeCompressedColAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private long executeCompressedColAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE); ValueFunction vfun = getAggFunction(); @@ -962,9 +967,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } - private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -972,19 +977,19 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe); while( iter.hasNext() ) { IJV cell = iter.next(); - double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ()); + double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ()); kplus.execute2(kbuff, val); } return kbuff._sum; } - private double executeCompressedAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, - int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + private double executeCompressedAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException { //safe aggregation for min/max w/ handling of zero entries //note: sparse safe with zero value as min/max handled outside - double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; + double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; ValueFunction vfun = getAggFunction(); Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe); @@ -996,7 +1001,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return ret; } - protected abstract double genexec( double a, SideInput[] b, + protected abstract double genexec( double a, SideInput[] b, double[] scalars, int m, int n, int rowIndex, int colIndex); private class ParAggTask implements Callable<Double> @@ -1033,7 +1038,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl } } - private class ParExecTask implements Callable<Long> + private class ParExecTask implements Callable<Long> { private final MatrixBlock _a; private final SideInput[] _b; @@ -1045,7 +1050,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl private final int _rl; private final int _ru; - protected ParExecTask( MatrixBlock a, SideInput[] b, double[] scalars, MatrixBlock c, + protected ParExecTask( MatrixBlock a, SideInput[] b, double[] scalars, MatrixBlock c, int rlen, int clen, boolean sparseSafe, int rl, int ru ) { _a = a; _b = b; http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java index c3755d4..679c964 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofMultiAggregate.java @@ -136,11 +136,13 @@ public abstract class SpoofMultiAggregate extends SpoofOperator implements Seria private void executeDense(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, int rl, int ru) throws DMLRuntimeException { + SideInput[] lb = createSparseSideInputs(b); + //core dense aggregation operation for( int i=rl, ix=rl*n; i<ru; i++ ) { for( int j=0; j<n; j++, ix++ ) { double in = (a != null) ? a[ix] : 0; - genexec( in, b, scalars, c, m, n, i, j ); + genexec( in, lb, scalars, c, m, n, i, j ); } } } @@ -148,11 +150,13 @@ public abstract class SpoofMultiAggregate extends SpoofOperator implements Seria private void executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, double[] c, int m, int n, int rl, int ru) throws DMLRuntimeException { + SideInput[] lb = createSparseSideInputs(b); + //core dense aggregation operation for( int i=rl; i<ru; i++ ) for( int j=0; j<n; j++ ) { double in = (sblock != null) ? sblock.get(i, j) : 0; - genexec( in, b, scalars, c, m, n, i, j ); + genexec( in, lb, scalars, c, m, n, i, j ); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2610a79d/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java index cff640c..fe32839 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java @@ -37,24 +37,24 @@ public abstract class SpoofOperator implements Serializable private static final long serialVersionUID = 3834006998853573319L; private static final Log LOG = LogFactory.getLog(SpoofOperator.class.getName()); - public abstract void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out) + public abstract void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out) throws DMLRuntimeException; - public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out, int k) + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, MatrixBlock out, int k) throws DMLRuntimeException { //default implementation serial execution execute(inputs, scalars, out); } - public abstract String getSpoofType(); + public abstract String getSpoofType(); public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars) throws DMLRuntimeException { throw new RuntimeException("Invalid invocation in base class."); } - public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, int k) - throws DMLRuntimeException + public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalars, int k) + throws DMLRuntimeException { //default implementation serial execution return execute(inputs, scalars); @@ -76,13 +76,13 @@ public abstract class SpoofOperator implements Serializable return prepInputMatrices(inputs, 1, inputs.size()-1, denseOnly, tB1); } - protected SideInput[] prepInputMatrices(ArrayList<MatrixBlock> inputs, int offset, int len, boolean denseOnly, boolean tB1) - throws DMLRuntimeException + protected SideInput[] prepInputMatrices(ArrayList<MatrixBlock> inputs, int offset, int len, boolean denseOnly, boolean tB1) + throws DMLRuntimeException { - SideInput[] b = new SideInput[len]; + SideInput[] b = new SideInput[len]; for(int i=offset; i<offset+len; i++) { //decompress if necessary - if( inputs.get(i) instanceof CompressedMatrixBlock ) + if( inputs.get(i) instanceof CompressedMatrixBlock ) inputs.set(i, ((CompressedMatrixBlock)inputs.get(i)).decompress()); //transpose if necessary int clen = inputs.get(i).getNumColumns(); @@ -113,6 +113,26 @@ public abstract class SpoofOperator implements Serializable return b; } + protected SideInput[] createSparseSideInputs(SideInput[] input) { + //determine if there are sparse side inputs + boolean containsSparse = false; + for( int i=0; i<input.length; i++ ) { + SideInput tmp = input[i]; + containsSparse |= (tmp.mdat != null && tmp.mdat.isInSparseFormat() + && !tmp.mdat.isEmptyBlock(false) && tmp.clen > 1); + } + if( !containsSparse ) + return input; + SideInput[] ret = new SideInput[input.length]; + for( int i=0; i<input.length; i++ ) { + SideInput tmp = input[i]; + ret[i] = (tmp.mdat != null && tmp.mdat.isInSparseFormat() + && !tmp.mdat.isEmptyBlock(false) && tmp.clen > 1) ? + new SideInputSparse(tmp) : tmp; + } + return ret; + } + public double[][] getDenseMatrices(SideInput[] inputs) { double[][] ret = new double[inputs.length][]; for( int i=0; i<inputs.length; i++ ) @@ -121,7 +141,7 @@ public abstract class SpoofOperator implements Serializable } protected double[] prepInputScalars(ArrayList<ScalarObject> scalarObjects) { - double[] scalars = new double[scalarObjects.size()]; + double[] scalars = new double[scalarObjects.size()]; for(int i=0; i < scalarObjects.size(); i++) scalars[i] = scalarObjects.get(i).getDoubleValue(); return scalars; @@ -156,7 +176,7 @@ public abstract class SpoofOperator implements Serializable protected static double getValue(SideInput data, int rowIndex) { //note: wrapper sideinput guaranteed to exist - return (data.ddat!=null) ? data.ddat[rowIndex] : + return (data.ddat!=null) ? data.ddat[rowIndex] : (data.mdat!=null) ? data.mdat.quickGetValue(rowIndex, 0) : 0; } @@ -169,6 +189,7 @@ public abstract class SpoofOperator implements Serializable protected static double getValue(SideInput data, int n, int rowIndex, int colIndex) { //note: wrapper sideinput guaranteed to exist return (data.ddat!=null) ? data.ddat[rowIndex*n+colIndex] : + (data instanceof SideInputSparse) ? ((SideInputSparse)data).next(rowIndex, colIndex) : (data.mdat!=null) ? data.mdat.quickGetValue(rowIndex, colIndex) : 0; } @@ -196,4 +217,34 @@ public abstract class SpoofOperator implements Serializable clen = clength; } } + + public static class SideInputSparse extends SideInput { + private int currRowIndex = -1; + private int currColPos = 0; + private int currLen = 0; + private int[] indexes; + private double[] values; + + public SideInputSparse(SideInput in) { + super(in.ddat, in.mdat, in.clen); + } + public double next(int rowIndex, int colIndex) { + if( mdat.getSparseBlock().isEmpty(rowIndex) ) + return 0; + //move to next row if necessary + if( rowIndex > currRowIndex ) { + currRowIndex = rowIndex; + currColPos = mdat.getSparseBlock().pos(currRowIndex); + currLen = mdat.getSparseBlock().size(currRowIndex) + currColPos; + indexes = mdat.getSparseBlock().indexes(currRowIndex); + values = mdat.getSparseBlock().values(currRowIndex); + } + //move to next colpos if necessary + while( currColPos < currLen && indexes[currColPos]<colIndex ) + currColPos ++; + //return current value or zero + return (currColPos < currLen && indexes[currColPos]==colIndex) ? + values[currColPos] : 0; + } + } }
