[SYSTEMML-1788] Extended codegen cell-wise ops (column aggregation) This patch extends the code generator cell-wise template (compiler/runtime) by column aggregations for sum, sumsq, min, and max. Although, row-wise templates also cover column aggregations with sum, the cell-wise template is more efficient (if no row aggregations are required) because it does not create materialize vector intermediates and hence is more cache-friendly for large numbers of columns.
Furthermore, this patch also includes some minor cleanups of the core matrix block data structure and operations. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/45367829 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/45367829 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/45367829 Branch: refs/heads/master Commit: 45367829a9b47dfbacaa5631770453ee76c3a1e9 Parents: d7e4c5a Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Jul 21 21:04:00 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Jul 22 13:53:16 2017 -0700 ---------------------------------------------------------------------- .../sysml/hops/codegen/cplan/CNodeCell.java | 1 + .../hops/codegen/template/TemplateCell.java | 7 +- .../hops/codegen/template/TemplateUtils.java | 12 +- .../sysml/runtime/codegen/SpoofCellwise.java | 243 ++++++++++++++++++- .../instructions/spark/SpoofSPInstruction.java | 16 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 56 ++--- .../functions/codegen/CellwiseTmplTest.java | 44 +++- .../functions/codegen/RowAggTmplTest.java | 2 +- .../scripts/functions/codegen/cellwisetmpl15.R | 31 +++ .../functions/codegen/cellwisetmpl15.dml | 27 +++ .../scripts/functions/codegen/cellwisetmpl16.R | 30 +++ .../functions/codegen/cellwisetmpl16.dml | 27 +++ .../scripts/functions/codegen/rowAggPattern14.R | 2 +- .../functions/codegen/rowAggPattern14.dml | 2 +- 14 files changed, 433 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeCell.java b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeCell.java index 062e9a0..36cf56f 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeCell.java +++ b/src/main/java/org/apache/sysml/hops/codegen/cplan/CNodeCell.java @@ -148,6 +148,7 @@ public class CNodeCell extends CNodeTpl switch( _type ) { case NO_AGG: return SpoofOutputDimsType.INPUT_DIMS; case ROW_AGG: return SpoofOutputDimsType.ROW_DIMS; + case COL_AGG: return SpoofOutputDimsType.COLUMN_DIMS_COLS; case FULL_AGG: return SpoofOutputDimsType.SCALAR; default: throw new RuntimeException("Unsupported cell type: "+_type.toString()); http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java index c73216e..68f7412 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java +++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateCell.java @@ -31,7 +31,6 @@ import org.apache.sysml.hops.DataOp; import org.apache.sysml.hops.Hop; import org.apache.sysml.hops.UnaryOp; import org.apache.sysml.hops.Hop.AggOp; -import org.apache.sysml.hops.Hop.Direction; import org.apache.sysml.hops.Hop.OpOp2; import org.apache.sysml.hops.Hop.ParamBuiltinOp; import org.apache.sysml.hops.IndexingOp; @@ -82,8 +81,7 @@ public class TemplateCell extends TemplateBase @Override public boolean fuse(Hop hop, Hop input) { return !isClosed() && (isValidOperation(hop) - || (HopRewriteUtils.isAggUnaryOp(hop, SUPPORTED_AGG) - && ((AggUnaryOp) hop).getDirection()!= Direction.Col) + || HopRewriteUtils.isAggUnaryOp(hop, SUPPORTED_AGG) || (HopRewriteUtils.isMatrixMultiply(hop) && hop.getDim1()==1 && hop.getDim2()==1) && HopRewriteUtils.isTransposeOperation(hop.getInput().get(0)) @@ -102,8 +100,7 @@ public class TemplateCell extends TemplateBase @Override public CloseType close(Hop hop) { //need to close cell tpl after aggregation, see fuse for exact properties - if( (HopRewriteUtils.isAggUnaryOp(hop, SUPPORTED_AGG) - && ((AggUnaryOp) hop).getDirection()!= Direction.Col) + if( HopRewriteUtils.isAggUnaryOp(hop, SUPPORTED_AGG) || (HopRewriteUtils.isMatrixMultiply(hop) && hop.getDim1()==1 && hop.getDim2()==1) ) return CloseType.CLOSED_VALID; else if( hop instanceof AggUnaryOp || hop instanceof AggBinaryOp ) http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java index 647c9d3..402f9fe 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java +++ b/src/main/java/org/apache/sysml/hops/codegen/template/TemplateUtils.java @@ -167,9 +167,15 @@ public class TemplateUtils } public static CellType getCellType(Hop hop) { - return (hop instanceof AggBinaryOp) ? CellType.FULL_AGG : - (hop instanceof AggUnaryOp) ? ((((AggUnaryOp) hop).getDirection() == Direction.RowCol) ? - CellType.FULL_AGG : CellType.ROW_AGG) : CellType.NO_AGG; + if( hop instanceof AggBinaryOp ) + return CellType.FULL_AGG; + else if( hop instanceof AggUnaryOp ) + switch( ((AggUnaryOp)hop).getDirection() ) { + case Row: return CellType.ROW_AGG; + case Col: return CellType.COL_AGG; + case RowCol: return CellType.FULL_AGG; + } + return CellType.NO_AGG; } public static RowType getRowType(Hop output, Hop... inputs) { http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java index 15de508..08032af 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java @@ -43,6 +43,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.matrix.data.IJV; +import org.apache.sysml.runtime.matrix.data.LibMatrixMult; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.util.UtilFunctions; @@ -56,6 +57,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl NO_AGG, FULL_AGG, ROW_AGG, + COL_AGG, } //redefinition of Hop.AggOp for cleaner imports in generate class @@ -208,10 +210,14 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl && genexec( 0, b, scalars, m, n, 0, 0 ) == 0); //result allocation and preparations - boolean sparseOut = sparseSafe && a.isInSparseFormat() - && _type == CellType.NO_AGG; - out.reset(a.getNumRows(), _type == CellType.NO_AGG ? - a.getNumColumns() : 1, sparseOut); + boolean sparseOut = _type == CellType.NO_AGG + && sparseSafe && a.isInSparseFormat(); + switch( _type ) { + case NO_AGG: out.reset(m, n, sparseOut); break; + case ROW_AGG: out.reset(m, 1, false); break; + case COL_AGG: out.reset(1, n, false); break; + default: throw new DMLRuntimeException("Invalid cell type: "+_type); + } out.allocateDenseOrSparseBlock(); long lnnz = 0; @@ -244,6 +250,23 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl //aggregate nnz and error handling for( Future<Long> task : taskret ) lnnz += task.get(); + if( _type == CellType.COL_AGG ) { + //aggregate partial results + double[] c = out.getDenseBlock(); + ValueFunction vfun = getAggFunction(); + if( vfun instanceof KahanFunction ) { + for( ParExecTask task : tasks ) + LibMatrixMult.vectAdd(task.getResult().getDenseBlock(), c, 0, 0, n); + } + else { + for( ParExecTask task : tasks ) { + double[] tmp = task.getResult().getDenseBlock(); + for(int j=0; j<n; j++) + c[j] = vfun.execute(c[j], tmp[j]); + } + } + lnnz = out.recomputeNonZeros(); + } } catch(Exception ex) { throw new DMLRuntimeException(ex); @@ -273,6 +296,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl else return executeDenseRowAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); } + else if( _type == CellType.COL_AGG ) { + if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) + return executeDenseColAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru); + else + return executeDenseColAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); + } return -1; } @@ -305,6 +334,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl else return executeSparseRowAggMxx(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); } + else if( _type == CellType.COL_AGG ) { + if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) + return executeSparseColAggSum(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + else + return executeSparseColAggMxx(sblock, b, scalars, out, m, n, sparseSafe, rl, ru); + } return -1; } @@ -339,6 +374,13 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl else return executeCompressedRowAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); } + else if( _type == CellType.COL_AGG ) { + double[] c = out.getDenseBlock(); + if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) + return executeCompressedColAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru); + else + return executeCompressedColAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru); + } return -1; } @@ -372,8 +414,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl } private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, - double[] c, int m, int n, boolean sparseSafe, int rl, int ru) - throws DMLRuntimeException + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) { KahanFunction kplus = (KahanFunction) getAggFunction(); KahanObject kbuff = new KahanObject(0, 0); @@ -401,7 +442,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl for( int i=rl; i<ru; i++ ) { double tmp = initialVal; for( int j=0; j<n; j++ ) - tmp = vfun.execute(tmp, genexec( 0, b, scalars, m, n, i, j )); + tmp = vfun.execute(tmp, genexec(0, b, scalars, m, n, i, j)); lnnz += ((c[i] = tmp)!=0) ? 1 : 0; } } @@ -410,7 +451,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl double tmp = initialVal; for( int j=0; j<n; j++, ix++ ) if( a[ix] != 0 || !sparseSafe) - tmp = vfun.execute(tmp, genexec( a[ix], b, scalars, m, n, i, j )); + tmp = vfun.execute(tmp, genexec(a[ix], b, scalars, m, n, i, j)); if( sparseSafe && UtilFunctions.containsZero(a, ix-n, n) ) tmp = vfun.execute(tmp, 0); lnnz += ((c[i] = tmp)!=0) ? 1 : 0; @@ -419,6 +460,55 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } + private long executeDenseColAggSum(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + { + KahanFunction kplus = (KahanFunction) getAggFunction(); + KahanObject kbuff = new KahanObject(0, 0); + double[] corr = new double[n]; + + for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int j=0; j<n; j++, ix++ ) { + double aval = (a != null) ? a[ix] : 0; + if( aval != 0 || !sparseSafe) { + kbuff.set(c[j], corr[j]); + kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, i, j)); + c[j] = kbuff._sum; + corr[j] = kbuff._correction; + } + } + return -1; + } + + private long executeDenseColAggMxx(double[] a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException + { + double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; + ValueFunction vfun = getAggFunction(); + Arrays.fill(c, initialVal); + + if( a == null && !sparseSafe ) { //empty + for( int i=rl; i<ru; i++ ) + for( int j=0; j<n; j++ ) + c[j] = vfun.execute(c[j], genexec(0, b, scalars, m, n, i, j)); + } + else if( a != null ) { //general case + int[] counts = new int[n]; + for( int i=rl, ix=rl*n; i<ru; i++ ) + for( int j=0; j<n; j++, ix++ ) + if( a[ix] != 0 || !sparseSafe) { + c[j] = vfun.execute(c[j], genexec(a[ix], b, scalars, m, n, i, j)); + counts[j] ++; + } + if( sparseSafe ) + for(int j=0; j<n; j++) + if( counts[j] != ru-rl ) + c[j] = vfun.execute(c[j], 0); + } + return -1; + } + private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException @@ -601,6 +691,97 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl } return lnnz; } + + private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException + { + KahanFunction kplus = (KahanFunction) getAggFunction(); + KahanObject kbuff = new KahanObject(0, 0); + double[] corr = new double[n]; + + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe + double[] c = out.getDenseBlock(); + for(int i=rl; i<ru; i++) { + kbuff.set(0, 0); + int lastj = -1; + //handle non-empty rows + if( sblock != null && !sblock.isEmpty(i) ) { + int apos = sblock.pos(i); + int alen = sblock.size(i); + int[] aix = sblock.indexes(i); + double[] avals = sblock.values(i); + for(int k=apos; k<apos+alen; k++) { + //process zeros before current non-zero + if( !sparseSafe ) + for(int j=lastj+1; j<aix[k]; j++) { + kbuff.set(c[aix[j]], corr[aix[j]]); + kplus.execute2(kbuff, genexec(0, b, scalars, m, n, i, j)); + c[aix[j]] = kbuff._sum; + corr[aix[j]] = kbuff._correction; + } + //process current non-zero + lastj = aix[k]; + kbuff.set(c[aix[k]], corr[aix[k]]); + kplus.execute2(kbuff, genexec(avals[k], b, scalars, m, n, i, lastj)); + c[aix[k]] = kbuff._sum; + corr[aix[k]] = kbuff._correction; + } + } + //process empty rows or remaining zeros + if( !sparseSafe ) + for(int j=lastj+1; j<n; j++) { + kbuff.set(c[j], corr[j]); + kplus.execute2(kbuff, genexec(0, b, scalars, m, n, i, j)); + c[j] = kbuff._sum; + corr[j] = kbuff._correction; + } + } + return -1; + } + + private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, + MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException + { + double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; + ValueFunction vfun = getAggFunction(); + double[] c = out.getDenseBlock(); + Arrays.fill(c, initialVal); + int[] count = new int[n]; + + //note: sequential scan algorithm for both sparse-safe and -unsafe + //in order to avoid binary search for sparse-unsafe + for(int i=rl; i<ru; i++) { + int lastj = -1; + //handle non-empty rows + if( sblock != null && !sblock.isEmpty(i) ) { + int apos = sblock.pos(i); + int alen = sblock.size(i); + int[] aix = sblock.indexes(i); + double[] avals = sblock.values(i); + for(int k=apos; k<apos+alen; k++) { + //process zeros before current non-zero + if( !sparseSafe ) + for(int j=lastj+1; j<aix[k]; j++) { + c[aix[j]] = vfun.execute(c[aix[j]], genexec(0, b, scalars, m, n, i, j)); + count[aix[j]] ++; + } + //process current non-zero + lastj = aix[k]; + c[aix[k]] = vfun.execute(c[aix[k]], genexec(avals[k], b, scalars, m, n, i, lastj)); + count[aix[k]] ++; + } + } + //process empty rows or remaining zeros + if( !sparseSafe ) + for(int j=lastj+1; j<n; j++) + c[j] = vfun.execute(c[j], genexec(0, b, scalars, m, n, i, j)); + } + + return -1; + } private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) @@ -744,6 +925,43 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl return lnnz; } + private long executeCompressedColAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + { + KahanFunction kplus = (KahanFunction) getAggFunction(); + KahanObject kbuff = new KahanObject(0, 0); + double[] corr = new double[n]; + + Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe); + while( iter.hasNext() ) { + IJV cell = iter.next(); + double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ()); + kbuff.set(c[cell.getJ()], corr[cell.getJ()]); + kplus.execute2(kbuff, val); + c[cell.getJ()] = kbuff._sum; + corr[cell.getJ()] = kbuff._correction; + } + return -1; + } + + private long executeCompressedColAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, + double[] c, int m, int n, boolean sparseSafe, int rl, int ru) + throws DMLRuntimeException + { + Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE); + ValueFunction vfun = getAggFunction(); + long lnnz = 0; + Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe); + while( iter.hasNext() ) { + IJV cell = iter.next(); + double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ()); + c[cell.getI()] = vfun.execute(c[cell.getI()], val); + } + for( int i=rl; i<ru; i++ ) + lnnz += (c[i]!=0) ? 1 : 0; + return lnnz; + } + private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException @@ -820,7 +1038,7 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl private final MatrixBlock _a; private final SideInput[] _b; private final double[] _scalars; - private final MatrixBlock _c; + private MatrixBlock _c; private final int _rlen; private final int _clen; private final boolean _safe; @@ -842,12 +1060,17 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl @Override public Long call() throws DMLRuntimeException { + _c = (_type==CellType.COL_AGG)? new MatrixBlock(1,_clen, false) : _c; if( _a instanceof CompressedMatrixBlock ) return executeCompressed((CompressedMatrixBlock)_a, _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru); else if( !_a.isInSparseFormat() ) return executeDense(_a.getDenseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru); else - return executeSparse(_a.getSparseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru); + return executeSparse(_a.getSparseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru); + } + + public MatrixBlock getResult() { + return _c; } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index 1d360a1..eae5560 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -128,11 +128,17 @@ public class SpoofSPInstruction extends SPInstruction AggregateOperator aggop = getAggregateOperator(op.getAggOp()); if( _out.getDataType()==DataType.MATRIX ) { - out = in.mapPartitionsToPair(new CellwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars), true); - if( op.getCellType()==CellType.ROW_AGG && mcIn.getCols() > mcIn.getColsPerBlock() ) { + //execute codegen block operation + out = in.mapPartitionsToPair(new CellwiseFunction( + _class.getName(), _classBytes, bcMatrices, scalars), true); + + if( (op.getCellType()==CellType.ROW_AGG && mcIn.getCols() > mcIn.getColsPerBlock()) + || (op.getCellType()==CellType.COL_AGG && mcIn.getRows() > mcIn.getRowsPerBlock())) { //TODO investigate if some other side effect of correct blocks - if( out.partitions().size() > mcIn.getNumRowBlocks() ) - out = RDDAggregateUtils.aggByKeyStable(out, aggop, (int)mcIn.getNumRowBlocks(), false); + long numBlocks = (op.getCellType()==CellType.ROW_AGG ) ? + mcIn.getNumRowBlocks() : mcIn.getNumColBlocks(); + if( out.partitions().size() > numBlocks ) + out = RDDAggregateUtils.aggByKeyStable(out, aggop, (int)numBlocks, false); else out = RDDAggregateUtils.aggByKeyStable(out, aggop, false); } @@ -405,6 +411,8 @@ public class SpoofSPInstruction extends SPInstruction else { if(((SpoofCellwise)_op).getCellType()==CellType.ROW_AGG) ixOut = new MatrixIndexes(ixOut.getRowIndex(), 1); + else if(((SpoofCellwise)_op).getCellType()==CellType.COL_AGG) + ixOut = new MatrixIndexes(1, ixOut.getColumnIndex()); _op.execute(inputs, _scalars, blkOut); } ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(ixOut, blkOut)); http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 835e491..d5306cb 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -36,7 +36,6 @@ import org.apache.commons.math3.random.Well1024a; import org.apache.hadoop.io.DataInputBuffer; import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.hops.Hop.OpOp2; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.MMTSJ.MMTSJType; import org.apache.sysml.lops.MapMultChain.ChainType; @@ -89,7 +88,6 @@ import org.apache.sysml.runtime.util.IndexRange; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.GPUStatistics; import org.apache.sysml.utils.NativeHelper; -import org.apache.sysml.utils.Statistics; @@ -110,9 +108,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //basic header (int rlen, int clen, byte type) public static final int HEADER_SIZE = 9; - //internal stats flag for matrix block internals //TODO remove - private static final boolean DISPLAY_STATISTICS = false; - public enum BlockType{ EMPTY_BLOCK, ULTRA_SPARSE_BLOCK, //ultra sparse representation, in-mem same as sparse @@ -324,17 +319,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab recomputeNonZeros(); } - public boolean isAllocated() - { - if( sparse ) - return (sparseBlock!=null); - else - return (denseBlock!=null); + public boolean isAllocated() { + return sparse ? (sparseBlock!=null) + : (denseBlock!=null); } - public void allocateDenseBlock() - throws RuntimeException - { + public void allocateDenseBlock() { allocateDenseBlock( true ); } @@ -345,24 +335,22 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab allocateDenseBlock(); } - @SuppressWarnings("unused") public void allocateDenseBlock(boolean clearNNZ) - throws RuntimeException { long limit = (long)rlen * clen; //check max size constraint (16GB dense), since java arrays are limited to 2^(32-1) elements) if( limit > Integer.MAX_VALUE ) { String execType = OptimizerUtils.isSparkExecutionMode() ? "SPARK" : "MR"; - throw new RuntimeException("Dense in-memory matrix block ("+rlen+"x"+clen+") exceeds supported size of "+Integer.MAX_VALUE+" elements (16GB). " + - "Please, report this issue and reduce the JVM heapsize to execute this operation in "+execType+"."); + throw new RuntimeException("Dense in-memory matrix block ("+rlen+"x"+clen+") " + + "exceeds supported size of "+Integer.MAX_VALUE+" elements (16GB). " + + "Please, report this issue and reduce the JVM heapsize to execute " + + "this operation in "+execType+"."); } //allocate block if non-existing or too small (guaranteed to be 0-initialized), if(denseBlock == null || denseBlock.length < limit) { - long start = DISPLAY_STATISTICS && DMLScript.STATISTICS ? System.nanoTime() : 0; denseBlock = new double[(int)limit]; - Statistics.allocateDoubleArrTime += DISPLAY_STATISTICS && DMLScript.STATISTICS ? (System.nanoTime() - start) : 0; } //clear nnz if necessary @@ -1008,11 +996,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @param opcode extended opcode * @throws DMLRuntimeException if DMLRuntimeException occurs */ - @SuppressWarnings("unused") public void examSparsity(String opcode) throws DMLRuntimeException { - long start = DISPLAY_STATISTICS && DMLScript.STATISTICS ? System.nanoTime() : 0; //determine target representation boolean sparseDst = evalSparseFormatInMemory(); @@ -1026,8 +1012,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab sparseToDense(opcode); else if( !sparse && sparseDst ) denseToSparse(opcode); - - Statistics.examSparsityTime += DISPLAY_STATISTICS && DMLScript.STATISTICS ? (System.nanoTime() - start) : 0; } /** @@ -1187,25 +1171,20 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * of the entire matrix block. * */ - @SuppressWarnings("unused") - public void recomputeNonZeros() - { - if( sparse && sparseBlock!=null ) //SPARSE (max long) - { + public long recomputeNonZeros() { + if( sparse && sparseBlock!=null ) { //SPARSE (max long) //note: rlen might be <= sparseBlock.numRows() nonZeros = sparseBlock.size(0, sparseBlock.numRows()); } - else if( !sparse && denseBlock!=null ) //DENSE (max int) - { - long start = DISPLAY_STATISTICS && DMLScript.STATISTICS ? System.nanoTime() : 0; + else if( !sparse && denseBlock!=null ) { //DENSE (max int) double[] a = denseBlock; final int limit=rlen*clen; int nnz = 0; for(int i=0; i<limit; i++) nnz += (a[i]!=0) ? 1 : 0; nonZeros = nnz; - Statistics.recomputeNNZTime += DISPLAY_STATISTICS && DMLScript.STATISTICS ? (System.nanoTime() - start) : 0; } + return nonZeros; } /** @@ -2552,10 +2531,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab nz2 = nz2 * m; //compute output sparsity consistent w/ the hop compiler - OpOp2 bop = op.getBinaryOperatorOpOp2(); double sp1 = OptimizerUtils.getSparsity(m, n, nz1); double sp2 = OptimizerUtils.getSparsity(m, n, nz2); - double spout = OptimizerUtils.getBinaryOpSparsity(sp1, sp2, bop, true); + double spout = OptimizerUtils.getBinaryOpSparsity( + sp1, sp2, op.getBinaryOperatorOpOp2(), true); estnnz = UtilFunctions.toLong(spout * m * n); } @@ -5808,11 +5787,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //////// // Misc methods - private static MatrixBlock checkType(MatrixValue block) - throws RuntimeException - { + private static MatrixBlock checkType(MatrixValue block) { if( block!=null && !(block instanceof MatrixBlock)) - throw new RuntimeException("Unsupported matrix value: "+block.getClass().getSimpleName()); + throw new RuntimeException("Unsupported matrix value: " + + block.getClass().getSimpleName()); return (MatrixBlock) block; } http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/java/org/apache/sysml/test/integration/functions/codegen/CellwiseTmplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CellwiseTmplTest.java b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CellwiseTmplTest.java index fbd456f..701a367 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CellwiseTmplTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CellwiseTmplTest.java @@ -50,6 +50,9 @@ public class CellwiseTmplTest extends AutomatedTestBase private static final String TEST_NAME12 = TEST_NAME+12; //((X/3) %% 0.6) + ((X/3) %/% 0.6) private static final String TEST_NAME13 = TEST_NAME+13; //min(X + 7 * Y) large private static final String TEST_NAME14 = TEST_NAME+14; //-2 * X + t(Y); t(Y) is rowvector + private static final String TEST_NAME15 = TEST_NAME+15; //colMins(2*log(X)) + private static final String TEST_NAME16 = TEST_NAME+16; //colSums(2*log(X)); + private static final String TEST_DIR = "functions/codegen/"; private static final String TEST_CLASS_DIR = TEST_DIR + CellwiseTmplTest.class.getSimpleName() + "/"; @@ -62,7 +65,7 @@ public class CellwiseTmplTest extends AutomatedTestBase @Override public void setUp() { TestUtils.clearAssertionInformation(); - for( int i=1; i<=14; i++ ) { + for( int i=1; i<=16; i++ ) { addTestConfiguration( TEST_NAME+i, new TestConfiguration( TEST_CLASS_DIR, TEST_NAME+i, new String[] {String.valueOf(i)}) ); } @@ -255,6 +258,37 @@ public class CellwiseTmplTest extends AutomatedTestBase testCodegenIntegration( TEST_NAME14, true, ExecType.SPARK ); } + @Test + public void testCodegenCellwiseRewrite15() { + testCodegenIntegration( TEST_NAME15, true, ExecType.CP ); + } + + @Test + public void testCodegenCellwise15() { + testCodegenIntegration( TEST_NAME15, false, ExecType.CP ); + } + + @Test + public void testCodegenCellwiseRewrite15_sp() { + testCodegenIntegration( TEST_NAME15, true, ExecType.SPARK ); + } + + @Test + public void testCodegenCellwiseRewrite16() { + testCodegenIntegration( TEST_NAME16, true, ExecType.CP ); + } + + @Test + public void testCodegenCellwise16() { + testCodegenIntegration( TEST_NAME16, false, ExecType.CP ); + } + + @Test + public void testCodegenCellwiseRewrite16_sp() { + testCodegenIntegration( TEST_NAME16, true, ExecType.SPARK ); + } + + private void testCodegenIntegration( String testname, boolean rewrites, ExecType instType ) { boolean oldRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; @@ -281,7 +315,7 @@ public class CellwiseTmplTest extends AutomatedTestBase String HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = HOME + testname + ".dml"; - programArgs = new String[]{"-explain", "runtime", "-stats", "-args", output("S") }; + programArgs = new String[]{"-explain", "hops", "-stats", "-args", output("S") }; fullRScriptName = HOME + testname + ".R"; rCmd = getRCmd(inputDir(), expectedDir()); @@ -313,7 +347,11 @@ public class CellwiseTmplTest extends AutomatedTestBase else if( testname.equals(TEST_NAME10) ) //ensure min/max is fused Assert.assertTrue(!heavyHittersContainsSubString("uamin","uamax")); else if( testname.equals(TEST_NAME11) ) //ensure replace is fused - Assert.assertTrue(!heavyHittersContainsSubString("replace")); + Assert.assertTrue(!heavyHittersContainsSubString("replace")); + else if( testname.equals(TEST_NAME15) ) + Assert.assertTrue(!heavyHittersContainsSubString("uacmin")); + else if( testname.equals(TEST_NAME16) ) + Assert.assertTrue(!heavyHittersContainsSubString("uack+")); } finally { rtplatform = platformOld; http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/java/org/apache/sysml/test/integration/functions/codegen/RowAggTmplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/codegen/RowAggTmplTest.java b/src/test/java/org/apache/sysml/test/integration/functions/codegen/RowAggTmplTest.java index 2092f22..6d25130 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/codegen/RowAggTmplTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/codegen/RowAggTmplTest.java @@ -49,7 +49,7 @@ public class RowAggTmplTest extends AutomatedTestBase private static final String TEST_NAME11 = TEST_NAME+"11"; //y - X %*% v private static final String TEST_NAME12 = TEST_NAME+"12"; //Y=(X>=v); R=Y/rowSums(Y) private static final String TEST_NAME13 = TEST_NAME+"13"; //rowSums(X)+rowSums(Y) - private static final String TEST_NAME14 = TEST_NAME+"14"; //colSums(max(floor(round(abs(min(sign(X+Y),1)))),7)) + private static final String TEST_NAME14 = TEST_NAME+"14"; //colSums(max(floor(round(abs(min(sign(X+Y),rowSums(X))))),7)) private static final String TEST_NAME15 = TEST_NAME+"15"; //systemml nn - softmax backward private static final String TEST_NAME16 = TEST_NAME+"16"; //Y=X-rowIndexMax(X); R=Y/rowSums(Y) private static final String TEST_NAME17 = TEST_NAME+"17"; //MLogreg - vector-matrix w/ indexing http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/cellwisetmpl15.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/cellwisetmpl15.R b/src/test/scripts/functions/codegen/cellwisetmpl15.R new file mode 100644 index 0000000..ac7da9a --- /dev/null +++ b/src/test/scripts/functions/codegen/cellwisetmpl15.R @@ -0,0 +1,31 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +args<-commandArgs(TRUE) +options(digits=22) +library("Matrix") +library("matrixStats") + +X = matrix(seq(7, 1100*200+6), 1100, 200, byrow=TRUE); + +R = t(colMins(2*log(X))); + +writeMM(as(R,"CsparseMatrix"), paste(args[2], "S", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/cellwisetmpl15.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/cellwisetmpl15.dml b/src/test/scripts/functions/codegen/cellwisetmpl15.dml new file mode 100644 index 0000000..e543671 --- /dev/null +++ b/src/test/scripts/functions/codegen/cellwisetmpl15.dml @@ -0,0 +1,27 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = matrix(seq(7, 1100*200+6), 1100, 200); +if(1==1){} + +R = colMins(2*log(X)); + +write(R, $1) http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/cellwisetmpl16.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/cellwisetmpl16.R b/src/test/scripts/functions/codegen/cellwisetmpl16.R new file mode 100644 index 0000000..d8fa3b9 --- /dev/null +++ b/src/test/scripts/functions/codegen/cellwisetmpl16.R @@ -0,0 +1,30 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +args<-commandArgs(TRUE) +options(digits=22) +library("Matrix") + +X = matrix(seq(7, 1100*200+6), 1100, 200, byrow=TRUE); + +R = t(colSums(2*log(X))); + +writeMM(as(R,"CsparseMatrix"), paste(args[2], "S", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/cellwisetmpl16.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/cellwisetmpl16.dml b/src/test/scripts/functions/codegen/cellwisetmpl16.dml new file mode 100644 index 0000000..1fb07b5 --- /dev/null +++ b/src/test/scripts/functions/codegen/cellwisetmpl16.dml @@ -0,0 +1,27 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = matrix(seq(7, 1100*200+6), 1100, 200); +if(1==1){} + +R = colSums(2*log(X)); + +write(R, $1) http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/rowAggPattern14.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/rowAggPattern14.R b/src/test/scripts/functions/codegen/rowAggPattern14.R index 34589e1..60f7714 100644 --- a/src/test/scripts/functions/codegen/rowAggPattern14.R +++ b/src/test/scripts/functions/codegen/rowAggPattern14.R @@ -28,7 +28,7 @@ library("matrixStats") X = matrix(seq(1,1500), 150, 10, byrow=TRUE); y = seq(1,150); -Z = pmax(floor(round(abs(pmin(sign(X+y),1)))),7); +Z = pmax(floor(round(abs(pmin(sign(X+y),rowSums(X)%*%matrix(1,1,10))))),7); R = t(colSums(Z)); writeMM(as(R, "CsparseMatrix"), paste(args[2], "S", sep="")); http://git-wip-us.apache.org/repos/asf/systemml/blob/45367829/src/test/scripts/functions/codegen/rowAggPattern14.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/codegen/rowAggPattern14.dml b/src/test/scripts/functions/codegen/rowAggPattern14.dml index f13c1ff..b47df7e 100644 --- a/src/test/scripts/functions/codegen/rowAggPattern14.dml +++ b/src/test/scripts/functions/codegen/rowAggPattern14.dml @@ -22,7 +22,7 @@ X = matrix(seq(1,1500), rows=150, cols=10); y = seq(1,150); -Z = max(floor(round(abs(min(sign(X+y),1)))),7) +Z = max(floor(round(abs(min(sign(X+y),rowSums(X))))),7) R = colSums(Z); write(R, $1)