Repository: incubator-systemml Updated Branches: refs/heads/master 184e02dac -> 21b96855b
[SYSTEMML-1255] New cp/spark column-wise ternary aggregates, tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/21b96855 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/21b96855 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/21b96855 Branch: refs/heads/master Commit: 21b96855b4e6f36b379cc3ebc5bb95435158d35a Parents: 184e02d Author: Matthias Boehm <[email protected]> Authored: Thu Feb 16 22:08:45 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Feb 17 13:55:02 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/AggUnaryOp.java | 18 +- .../org/apache/sysml/lops/TernaryAggregate.java | 24 +- .../runtime/compress/CompressedMatrixBlock.java | 5 +- .../instructions/CPInstructionParser.java | 3 +- .../runtime/instructions/InstructionUtils.java | 16 ++ .../instructions/SPInstructionParser.java | 2 +- .../cp/AggregateTernaryCPInstruction.java | 32 ++- .../spark/AggregateTernarySPInstruction.java | 128 +++++---- .../spark/AggregateUnarySPInstruction.java | 28 +- .../spark/ComputationSPInstruction.java | 27 ++ .../sysml/runtime/matrix/data/LibMatrixAgg.java | 191 ++++++++----- .../sysml/runtime/matrix/data/MatrixBlock.java | 22 +- .../operators/AggregateTernaryOperator.java | 57 ++++ .../functions/ternary/TernaryAggregateTest.java | 267 +++++++++++++++++++ .../functions/ternary/TernaryAggregateC.R | 33 +++ .../functions/ternary/TernaryAggregateC.dml | 30 +++ .../functions/ternary/TernaryAggregateRC.R | 34 +++ .../functions/ternary/TernaryAggregateRC.dml | 31 +++ .../functions/ternary/ZPackageSuite.java | 3 +- 19 files changed, 754 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/hops/AggUnaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java index 9964981..fce424c 100644 --- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java @@ -119,7 +119,7 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop if ( et == ExecType.CP ) { Lop agg1 = null; - if( isTernaryAggregateRewriteApplicable() ) { + if( isTernaryAggregateRewriteApplicable(et) ) { agg1 = constructLopsTernaryAggregateRewrite(et); } else if( isUnaryAggregateOuterCPRewriteApplicable() ) @@ -245,7 +245,7 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop DirectionTypes dir = HopsDirection2Lops.get(_direction); //unary aggregate - if( isTernaryAggregateRewriteApplicable() ) + if( isTernaryAggregateRewriteApplicable(et) ) { Lop aggregate = constructLopsTernaryAggregateRewrite(et); setOutputDimensions(aggregate); //0x0 (scalar) @@ -488,14 +488,15 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop return SparkAggType.MULTI_BLOCK; } - private boolean isTernaryAggregateRewriteApplicable() throws HopsException + private boolean isTernaryAggregateRewriteApplicable(ExecType et) + throws HopsException { boolean ret = false; //currently we support only sum over binary multiply but potentially //it can be generalized to any RC aggregate over two common binary operations - if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && - _direction == Direction.RowCol && _op == AggOp.SUM ) + if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && _op == AggOp.SUM && + (_direction == Direction.RowCol || _direction == Direction.Col) ) { Hop input1 = getInput().get(0); if( input1.getParent().size() == 1 && //sum single consumer @@ -639,7 +640,6 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop Hop input11 = input1.getInput().get(0); Hop input12 = input1.getInput().get(1); - Lop ret = null; Lop in1 = null; Lop in2 = null; Lop in3 = null; @@ -668,10 +668,10 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop // The execution type of a unary aggregate instruction should depend on the execution type of inputs to avoid OOM // Since we only support matrix-vector and not vector-matrix, checking the execution type of input1 should suffice. ExecType et_input = input1.optFindExecType(); - ret = new TernaryAggregate(in1, in2, in3, Aggregate.OperationTypes.KahanSum, - Binary.OperationTypes.MULTIPLY, DataType.SCALAR, ValueType.DOUBLE, et_input, k); + DirectionTypes dir = HopsDirection2Lops.get(_direction); - return ret; + return new TernaryAggregate(in1, in2, in3, Aggregate.OperationTypes.KahanSum, + Binary.OperationTypes.MULTIPLY, dir, getDataType(), ValueType.DOUBLE, et_input, k); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/lops/TernaryAggregate.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java index 0932424..08600e4 100644 --- a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java +++ b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java @@ -21,22 +21,24 @@ package org.apache.sysml.lops; import org.apache.sysml.lops.LopProperties.ExecLocation; import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.lops.PartialAggregate.DirectionTypes; import org.apache.sysml.lops.compile.JobType; import org.apache.sysml.parser.Expression.*; public class TernaryAggregate extends Lop { - - private static final String OPCODE = "tak+*"; + public static final String OPCODE_RC = "tak+*"; + public static final String OPCODE_C = "tack+*"; //NOTE: currently only used for ta+* //private Aggregate.OperationTypes _aggOp = null; //private Binary.OperationTypes _binOp = null; + private DirectionTypes _direction; //optional attribute for cp private int _numThreads = -1; - public TernaryAggregate(Lop input1, Lop input2, Lop input3, Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DataType dt, ValueType vt, ExecType et, int k ) + public TernaryAggregate(Lop input1, Lop input2, Lop input3, Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DirectionTypes direction, DataType dt, ValueType vt, ExecType et, int k ) { super(Lop.Type.TernaryAggregate, dt, vt); @@ -50,6 +52,7 @@ public class TernaryAggregate extends Lop input2.addOutput(this); input3.addOutput(this); + _direction = direction; _numThreads = k; boolean breaksAlignment = false; @@ -60,9 +63,8 @@ public class TernaryAggregate extends Lop } @Override - public String toString() - { - return "Operation: "+OPCODE; + public String toString() { + return "Operation: "+getOpCode(); } @Override @@ -72,7 +74,7 @@ public class TernaryAggregate extends Lop StringBuilder sb = new StringBuilder(); sb.append( getExecType() ); sb.append( OPERAND_DELIMITOR ); - sb.append( OPCODE ); + sb.append( getOpCode() ); sb.append( OPERAND_DELIMITOR ); sb.append( getInputs().get(0).prepInputOperand(input1)); sb.append( OPERAND_DELIMITOR ); @@ -89,4 +91,12 @@ public class TernaryAggregate extends Lop return sb.toString(); } + + private String getOpCode() { + switch( _direction ) { + case RowCol: return OPCODE_RC; + case Col: return OPCODE_C; + default: throw new RuntimeException("Unsupported aggregation direction: "+_direction); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index 84c4812..e345fca 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -77,6 +77,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator; @@ -1917,13 +1918,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } @Override - public ScalarObject aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op) + public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, boolean inCP) throws DMLRuntimeException { printDecompressWarning("aggregateTernaryOperations"); MatrixBlock left = isCompressed() ? decompress() : this; MatrixBlock right1 = getUncompressed(m2); MatrixBlock right2 = getUncompressed(m3); - return left.aggregateTernaryOperations(left, right1, right2, op); + return left.aggregateTernaryOperations(left, right1, right2, ret, op, inCP); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java index 9cf56d9..f3c1605 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java @@ -78,7 +78,8 @@ public class CPInstructionParser extends InstructionParser String2CPInstructionType = new HashMap<String, CPINSTRUCTION_TYPE>(); String2CPInstructionType.put( "ba+*" , CPINSTRUCTION_TYPE.AggregateBinary); - String2CPInstructionType.put( "tak+*" , CPINSTRUCTION_TYPE.AggregateTernary); + String2CPInstructionType.put( "tak+*" , CPINSTRUCTION_TYPE.AggregateTernary); + String2CPInstructionType.put( "tack+*" , CPINSTRUCTION_TYPE.AggregateTernary); String2CPInstructionType.put( "uak+" , CPINSTRUCTION_TYPE.AggregateUnary); String2CPInstructionType.put( "uark+" , CPINSTRUCTION_TYPE.AggregateUnary); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java index 6ef39f1..2a990d4 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java @@ -48,6 +48,7 @@ import org.apache.sysml.runtime.functionobjects.Divide; import org.apache.sysml.runtime.functionobjects.Equals; import org.apache.sysml.runtime.functionobjects.GreaterThan; import org.apache.sysml.runtime.functionobjects.GreaterThanEquals; +import org.apache.sysml.runtime.functionobjects.IndexFunction; import org.apache.sysml.runtime.functionobjects.IntegerDivide; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.functionobjects.KahanPlusSq; @@ -76,6 +77,7 @@ import org.apache.sysml.runtime.instructions.gpu.GPUInstruction.GPUINSTRUCTION_T import org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE; import org.apache.sysml.runtime.instructions.spark.SPInstruction.SPINSTRUCTION_TYPE; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; import org.apache.sysml.runtime.matrix.operators.LeftScalarOperator; @@ -372,6 +374,20 @@ public class InstructionUtils return aggun; } + public static AggregateTernaryOperator parseAggregateTernaryOperator(String opcode) { + return parseAggregateTernaryOperator(opcode, 1); + } + + public static AggregateTernaryOperator parseAggregateTernaryOperator(String opcode, int numThreads) { + CorrectionLocationType corr = opcode.equalsIgnoreCase("tak+*") ? + CorrectionLocationType.LASTCOLUMN : CorrectionLocationType.LASTROW; + AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, corr); + IndexFunction ixfun = opcode.equalsIgnoreCase("tak+*") ? + ReduceAll.getReduceAllFnObject() : ReduceRow.getReduceRowFnObject(); + + return new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg, ixfun, numThreads); + } + public static AggregateOperator parseAggregateOperator(String opcode, String corrExists, String corrLoc) { AggregateOperator agg = null; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 9008f09..fa05de3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -131,7 +131,7 @@ public class SPInstructionParser extends InstructionParser //ternary aggregate operators String2SPInstructionType.put( "tak+*" , SPINSTRUCTION_TYPE.AggregateTernary); - + String2SPInstructionType.put( "tack+*" , SPINSTRUCTION_TYPE.AggregateTernary); String2SPInstructionType.put( "rangeReIndex" , SPINSTRUCTION_TYPE.MatrixIndexing); String2SPInstructionType.put( "leftIndex" , SPINSTRUCTION_TYPE.MatrixIndexing); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java index b88c062..147436e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java @@ -21,18 +21,15 @@ package org.apache.sysml.runtime.instructions.cp; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.functionobjects.KahanPlus; -import org.apache.sysml.runtime.functionobjects.Multiply; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; -import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.Operator; public class AggregateTernaryCPInstruction extends ComputationCPInstruction -{ - public AggregateTernaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand in3, - CPOperand out, String opcode, String istr ) +{ + public AggregateTernaryCPInstruction(Operator op, CPOperand in1, CPOperand in2, + CPOperand in3, CPOperand out, String opcode, String istr ) { super(op, in1, in2, in3, out, opcode, istr); _cptype = CPINSTRUCTION_TYPE.AggregateTernary; @@ -44,7 +41,7 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); String opcode = parts[0]; - if ( opcode.equalsIgnoreCase("tak+*")) { + if ( opcode.equalsIgnoreCase("tak+*") || opcode.equalsIgnoreCase("tack+*") ) { InstructionUtils.checkNumFields( parts, 5 ); CPOperand in1 = new CPOperand(parts[1]); @@ -52,16 +49,13 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction CPOperand in3 = new CPOperand(parts[3]); CPOperand out = new CPOperand(parts[4]); int numThreads = Integer.parseInt(parts[5]); - - AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject()); - AggregateBinaryOperator op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, numThreads); + AggregateTernaryOperator op = InstructionUtils.parseAggregateTernaryOperator(opcode, numThreads); return new AggregateTernaryCPInstruction(op, in1, in2, in3, out, opcode, str); } else { - throw new DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown opcode " + opcode); - } - + throw new DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown opcode " + opcode); + } } @Override @@ -73,14 +67,18 @@ public class AggregateTernaryCPInstruction extends ComputationCPInstruction MatrixBlock matBlock3 = input3.isLiteral() ? null : //matrix or literal 1 ec.getMatrixInput(input3.getName()); - AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr; - ScalarObject ret = matBlock1.aggregateTernaryOperations(matBlock1, matBlock2, matBlock3, ab_op); + AggregateTernaryOperator ab_op = (AggregateTernaryOperator) _optr; + MatrixBlock ret = matBlock1.aggregateTernaryOperations( + matBlock1, matBlock2, matBlock3, new MatrixBlock(), ab_op, true); //release inputs/outputs ec.releaseMatrixInput(input1.getName()); ec.releaseMatrixInput(input2.getName()); if( !input3.isLiteral() ) ec.releaseMatrixInput(input3.getName()); - ec.setScalarOutput(output.getName(), ret); + if( output.getDataType().isScalar() ) + ec.setScalarOutput(output.getName(), new DoubleObject(ret.quickGetValue(0, 0))); + else + ec.setMatrixOutput(output.getName(), ret); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java index 4a7b130..2a305be 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java @@ -20,32 +20,29 @@ package org.apache.sysml.runtime.instructions.spark; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.KahanPlus; -import org.apache.sysml.runtime.functionobjects.Multiply; +import org.apache.sysml.runtime.functionobjects.ReduceAll; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.cp.DoubleObject; -import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; -import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; -import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.Operator; public class AggregateTernarySPInstruction extends ComputationSPInstruction { - - public AggregateTernarySPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand in3, - CPOperand out, String opcode, String istr ) + public AggregateTernarySPInstruction(Operator op, CPOperand in1, CPOperand in2, + CPOperand in3, CPOperand out, String opcode, String istr ) { super(op, in1, in2, in3, out, opcode, istr); _sptype = SPINSTRUCTION_TYPE.AggregateTernary; @@ -57,21 +54,19 @@ public class AggregateTernarySPInstruction extends ComputationSPInstruction String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); String opcode = parts[0]; - if ( opcode.equalsIgnoreCase("tak+*")) { - InstructionUtils.checkNumFields ( parts, 4 ); + if ( opcode.equalsIgnoreCase("tak+*") || opcode.equalsIgnoreCase("tack+*") ) { + InstructionUtils.checkNumFields( parts, 4 ); CPOperand in1 = new CPOperand(parts[1]); CPOperand in2 = new CPOperand(parts[2]); CPOperand in3 = new CPOperand(parts[3]); CPOperand out = new CPOperand(parts[4]); - - AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject()); - AggregateBinaryOperator op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg); + AggregateTernaryOperator op = InstructionUtils.parseAggregateTernaryOperator(opcode); return new AggregateTernarySPInstruction(op, in1, in2, in3, out, opcode, str); } else { - throw new DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown opcode " + opcode); + throw new DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown opcode " + opcode); } } @@ -81,88 +76,109 @@ public class AggregateTernarySPInstruction extends ComputationSPInstruction { SparkExecutionContext sec = (SparkExecutionContext)ec; - //get input + //get inputs + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( input1.getName() ); JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = input3.isLiteral() ? null : //matrix or literal 1 sec.getBinaryBlockRDDHandleForVariable( input3.getName() ); //execute aggregate ternary operation - AggregateBinaryOperator aggop = (AggregateBinaryOperator) _optr; - JavaRDD<MatrixBlock> out = null; + AggregateTernaryOperator aggop = (AggregateTernaryOperator) _optr; + JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; if( in3 != null ) { //3 inputs - out = in1.join( in2 ).join( in3 ).values() - .map(new RDDAggregateTernaryFunction(aggop)); + out = in1.join( in2 ).join( in3 ) + .mapToPair(new RDDAggregateTernaryFunction(aggop)); } else { //2 inputs (third is literal 1) - out = in1.join( in2 ).values() - .map(new RDDAggregateTernaryFunction2(aggop)); + out = in1.join( in2 ) + .mapToPair(new RDDAggregateTernaryFunction2(aggop)); + } + + //aggregate partial results + if( aggop.indexFn instanceof ReduceAll ) //tak+* + { + //aggregate and create output (no lineage because scalar) + MatrixBlock tmp = RDDAggregateUtils.sumStable(out.values()); + DoubleObject ret = new DoubleObject(tmp.getValue(0, 0)); + sec.setVariable(output.getName(), ret); + } + else if( mcIn.dimsKnown() && mcIn.getCols()<=mcIn.getColsPerBlock() ) //tack+* single block + { + //single block aggregation and drop correction + MatrixBlock ret = RDDAggregateUtils.aggStable(out, aggop.aggOp); + ret.dropLastRowsOrColums(aggop.aggOp.correctionLocation); + + //put output block into symbol table (no lineage because single block) + //this also includes implicit maintenance of matrix characteristics + sec.setMatrixOutput(output.getName(), ret); + } + else //tack+* multi block + { + //multi-block aggregation and drop correction + out = RDDAggregateUtils.aggByKeyStable(out, aggop.aggOp); + out = out.mapValues( new AggregateDropCorrectionFunction(aggop.aggOp) ); + + //put output RDD handle into symbol table + updateUnaryAggOutputMatrixCharacteristics(sec, aggop.indexFn); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + sec.addLineageRDD(output.getName(), input2.getName()); + if( in3 != null ) + sec.addLineageRDD(output.getName(), input3.getName()); } - - //aggregate and create output (no lineage because scalar) - MatrixBlock tmp = RDDAggregateUtils.sumStable(out); - DoubleObject ret = new DoubleObject(tmp.getValue(0, 0)); - sec.setVariable(output.getName(), ret); } private static class RDDAggregateTernaryFunction - implements Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock> + implements PairFunction<Tuple2<MatrixIndexes, Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = 6410232464410434210L; - private AggregateBinaryOperator _aggop = null; + private final AggregateTernaryOperator _aggop; - public RDDAggregateTernaryFunction( AggregateBinaryOperator aggop ) - { - _aggop = aggop; + public RDDAggregateTernaryFunction( AggregateTernaryOperator aggop ) { + _aggop = aggop; } @Override - public MatrixBlock call(Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> arg0) + public Tuple2<MatrixIndexes,MatrixBlock> call(Tuple2<MatrixIndexes,Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock>> arg0) throws Exception { //get inputs - MatrixBlock in1 = arg0._1()._1(); - MatrixBlock in2 = arg0._1()._2(); - MatrixBlock in3 = arg0._2(); + MatrixIndexes ix = arg0._1(); + MatrixBlock in1 = arg0._2()._1()._1(); + MatrixBlock in2 = arg0._2()._1()._2(); + MatrixBlock in3 = arg0._2()._2(); //execute aggregate ternary operation - ScalarObject ret = in1.aggregateTernaryOperations(in1, in2, in3, _aggop); - - //create output matrix block (w/ correction) - MatrixBlock out = new MatrixBlock(2,1,false); - out.quickSetValue(0, 0, ret.getDoubleValue()); - return out; + return new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(1, ix.getColumnIndex()), + in1.aggregateTernaryOperations(in1, in2, in3, new MatrixBlock(), _aggop, false)); } } private static class RDDAggregateTernaryFunction2 - implements Function<Tuple2<MatrixBlock,MatrixBlock>, MatrixBlock> + implements PairFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -6615412819746331700L; - private AggregateBinaryOperator _aggop = null; + private final AggregateTernaryOperator _aggop; - public RDDAggregateTernaryFunction2( AggregateBinaryOperator aggop ) - { + public RDDAggregateTernaryFunction2( AggregateTernaryOperator aggop ) { _aggop = aggop; } @Override - public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0) + public Tuple2<MatrixIndexes,MatrixBlock> call(Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>> arg0) throws Exception { //get inputs - MatrixBlock in1 = arg0._1(); - MatrixBlock in2 = arg0._2(); + MatrixIndexes ix = arg0._1(); + MatrixBlock in1 = arg0._2()._1(); + MatrixBlock in2 = arg0._2()._2(); //execute aggregate ternary operation - ScalarObject ret = in1.aggregateTernaryOperations(in1, in2, null, _aggop); - - //create output matrix block (w/ correction) - MatrixBlock out = new MatrixBlock(2,1,false); - out.quickSetValue(0, 0, ret.getDoubleValue()); - return out; + return new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1, ix.getColumnIndex()), + in1.aggregateTernaryOperations(in1, in2, null, new MatrixBlock(), _aggop, false)); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java index 54ca925..eb9324f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java @@ -31,9 +31,6 @@ import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.functionobjects.ReduceAll; -import org.apache.sysml.runtime.functionobjects.ReduceCol; -import org.apache.sysml.runtime.functionobjects.ReduceRow; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction; @@ -130,35 +127,12 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction } //put output RDD handle into symbol table - updateUnaryAggOutputMatrixCharacteristics(sec); + updateUnaryAggOutputMatrixCharacteristics(sec, auop.indexFn); sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); } } - protected void updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec) - throws DMLRuntimeException - { - AggregateUnaryOperator auop = (AggregateUnaryOperator)_optr; - - MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); - MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - if(!mcOut.dimsKnown()) { - if(!mc1.dimsKnown()) { - throw new DMLRuntimeException("The output dimensions are not specified and cannot be inferred from input:" + mc1.toString() + " " + mcOut.toString()); - } - else { - //infer statistics from input based on operator - if( auop.indexFn instanceof ReduceAll ) - mcOut.set(1, 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock()); - else if (auop.indexFn instanceof ReduceCol) - mcOut.set(mc1.getRows(), 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock()); - else if (auop.indexFn instanceof ReduceRow) - mcOut.set(1, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock()); - } - } - } - private static class RDDUAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = 2672082409287856038L; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java index 7a152a6..3d26b4c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java @@ -21,6 +21,10 @@ package org.apache.sysml.runtime.instructions.spark; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.functionobjects.IndexFunction; +import org.apache.sysml.runtime.functionobjects.ReduceAll; +import org.apache.sysml.runtime.functionobjects.ReduceCol; +import org.apache.sysml.runtime.functionobjects.ReduceRow; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.operators.Operator; @@ -87,4 +91,27 @@ public abstract class ComputationSPInstruction extends SPInstruction { sec.getMatrixCharacteristics(output.getName()).set(mcIn1.getRows(), mcIn1.getCols(), mcIn1.getRowsPerBlock(), mcIn1.getRowsPerBlock()); } } + + protected void updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec, IndexFunction ixFn) + throws DMLRuntimeException + { + MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + if( mcOut.dimsKnown() ) + return; + + if(!mc1.dimsKnown()) { + throw new DMLRuntimeException("The output dimensions are not specified and " + + "cannot be inferred from input:" + mc1.toString() + " " + mcOut.toString()); + } + else { + //infer statistics from input based on operator + if( ixFn instanceof ReduceAll ) + mcOut.set(1, 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock()); + else if( ixFn instanceof ReduceCol ) + mcOut.set(mc1.getRows(), 1, mc1.getRowsPerBlock(), mc1.getColsPerBlock()); + else if( ixFn instanceof ReduceRow ) + mcOut.set(1, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index 914480b..404f440 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.matrix.data; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -48,6 +49,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; @@ -388,64 +390,76 @@ public class LibMatrixAgg return out; } - public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3) + public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op) + throws DMLRuntimeException { //early abort if any block is empty if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) { - return 0; + return ret; } - + //Timing time = new Timing(true); - double val = -1; + //allocate output arrays (if required) + ret.reset(ret.rlen, ret.clen, false); //always dense + ret.allocateDenseBlock(); + + IndexFunction ixFn = op.indexFn; if( !in1.sparse && !in2.sparse && (in3==null||!in3.sparse) ) //DENSE - val = aggregateTernaryDense(in1, in2, in3, 0, in1.rlen); + aggregateTernaryDense(in1, in2, in3, ret, ixFn, 0, in1.rlen); else //GENERAL CASE - val = aggregateTernaryGeneric(in1, in2, in3, 0, in1.rlen); + aggregateTernaryGeneric(in1, in2, in3, ret, ixFn, 0, in1.rlen); - //System.out.println("tak+ ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); + //cleanup output and change representation (if necessary) + ret.recomputeNonZeros(); + ret.examSparsity(); - return val; + //System.out.println("tak+ ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); + + return ret; } - public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int k) + public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) throws DMLRuntimeException { //fall back to sequential version if necessary - if( k <= 1 || in1.rlen/3 < PAR_NUMCELL_THRESHOLD ) { - return aggregateTernary(in1, in2, in3); + if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD || in1.rlen <= k/2 + || (!(op.indexFn instanceof ReduceCol) && ret.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD) ) { + return aggregateTernary(in1, in2, in3, ret, op); } //early abort if any block is empty if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) { - return 0; + return ret; } //Timing time = new Timing(true); - double val = -1; try { ExecutorService pool = Executors.newFixedThreadPool( k ); ArrayList<AggTernaryTask> tasks = new ArrayList<AggTernaryTask>(); int blklen = (int)(Math.ceil((double)in1.rlen/k)); + IndexFunction ixFn = op.indexFn; for( int i=0; i<k & i*blklen<in1.rlen; i++ ) - tasks.add( new AggTernaryTask(in1, in2, in3, i*blklen, Math.min((i+1)*blklen, in1.rlen))); - pool.invokeAll(tasks); + tasks.add( new AggTernaryTask(in1, in2, in3, ret, ixFn, i*blklen, Math.min((i+1)*blklen, in1.rlen))); + List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks); pool.shutdown(); - //aggregate partial results - KahanObject kbuff = new KahanObject(0, 0); - KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); - for( AggTernaryTask task : tasks ) - kplus.execute2(kbuff, task.getResult()); - val = kbuff._sum; + //aggregate partial results and error handling + ret.copy(rtasks.get(0).get()); //for init + for( int i=1; i<rtasks.size(); i++ ) + aggregateFinalResult(op.aggOp, ret, rtasks.get(i).get()); } catch(Exception ex) { throw new DMLRuntimeException(ex); } - //System.out.println("tak+ k="+k+" ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); + //cleanup output and change representation (if necessary) + ret.recomputeNonZeros(); + ret.examSparsity(); - return val; + //System.out.println("tak+ k="+k+" ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); + + return ret; } public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op) @@ -642,49 +656,70 @@ public class LibMatrixAgg out.binaryOperationsInPlace(laop.increOp, partout); } - private static double aggregateTernaryDense(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru) + private static void aggregateTernaryDense(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru) { //compute block operations KahanObject kbuff = new KahanObject(0, 0); KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); double[] a = in1.denseBlock; - double[] b = in2.denseBlock; + double[] b1 = in2.denseBlock; + double[] b2 = (in3!=null) ? in3.denseBlock : null; //if null, literal 1 final int n = in1.clen; - if( in3 != null ) //3 inputs + if( ixFn instanceof ReduceAll ) //tak+* { - double[] c = in3.denseBlock; - for( int i=rl, ix=rl*n; i<ru; i++ ) for( int j=0; j<n; j++, ix++ ) { - double val = a[ix] * b[ix] * c[ix]; + double b2val = (b2 != null) ? b2[ix] : 1; + double val = a[ix] * b1[ix] * b2val; kplus.execute2( kbuff, val ); } + ret.quickSetValue(0, 0, kbuff._sum); + ret.quickSetValue(0, 1, kbuff._correction); } - else //2 inputs (third: literal 1) + else //tack+* { - for( int i=rl, ix=rl*n; i<ru; i++ ) + double[] c = ret.getDenseBlock(); + for( int i=rl, ix=rl*n; i<ru; i++ ) for( int j=0; j<n; j++, ix++ ) { - double val = a[ix] * b[ix]; - kplus.execute2( kbuff, val ); + double b2val = (b2 != null) ? b2[ix] : 1; + double val = a[ix] * b1[ix] * b2val; + kbuff._sum = c[j]; + kbuff._correction = c[j+n]; + kplus.execute2(kbuff, val); + c[j] = kbuff._sum; + c[j+n] = kbuff._correction; } } - - return kbuff._sum; } - private static double aggregateTernaryGeneric(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru) + private static void aggregateTernaryGeneric(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru) { //compute block operations KahanObject kbuff = new KahanObject(0, 0); KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + //guaranteed to have at least one sparse input, sort by nnz, assume num cells if + //(potentially incorrect) in dense representation, keep null at end via stable sort + MatrixBlock[] blocks = new MatrixBlock[]{in1, in2, in3}; + Arrays.sort(blocks, new Comparator<MatrixBlock>() { + @Override + public int compare(MatrixBlock o1, MatrixBlock o2) { + long nnz1 = (o1!=null && o1.sparse) ? o1.nonZeros : Long.MAX_VALUE; + long nnz2 = (o2!=null && o2.sparse) ? o2.nonZeros : Long.MAX_VALUE; + return Long.compare(nnz1, nnz2); + } + }); + MatrixBlock lin1 = blocks[0]; + MatrixBlock lin2 = blocks[1]; + MatrixBlock lin3 = blocks[2]; + + SparseBlock a = lin1.sparseBlock; final int n = in1.clen; - if( in1.sparse ) + if( ixFn instanceof ReduceAll ) //tak+* { - SparseBlock a = in1.sparseBlock; - for( int i=rl; i<ru; i++ ) if( !a.isEmpty(i) ) { int apos = a.pos(i); @@ -693,28 +728,40 @@ public class LibMatrixAgg double[] avals = a.values(i); for( int j=apos; j<apos+alen; j++ ) { double val1 = avals[j]; - double val2 = in2.quickGetValue(i, aix[j]); + double val2 = lin2.quickGetValue(i, aix[j]); double val = val1 * val2; - if( val != 0 && in3 != null ) - val *= in3.quickGetValue(i, aix[j]); + if( val != 0 && lin3 != null ) + val *= lin3.quickGetValue(i, aix[j]); kplus.execute2( kbuff, val ); } } + ret.quickSetValue(0, 0, kbuff._sum); + ret.quickSetValue(0, 1, kbuff._correction); } - else //generic case + else //tack+* { + double[] c = ret.getDenseBlock(); for( int i=rl; i<ru; i++ ) - for( int j=0; j<n; j++ ){ - double val1 = in1.quickGetValue(i, j); - double val2 = in2.quickGetValue(i, j); - double val = val1 * val2; - if( in3 != null ) - val *= in3.quickGetValue(i, j); - kplus.execute2( kbuff, val ); - } + if( !a.isEmpty(i) ) { + int apos = a.pos(i); + int alen = a.size(i); + int[] aix = a.indexes(i); + double[] avals = a.values(i); + for( int j=apos; j<apos+alen; j++ ) { + int colIx = aix[j]; + double val1 = avals[j]; + double val2 = lin2.quickGetValue(i, colIx); + double val = val1 * val2; + if( val != 0 && lin3 != null ) + val *= lin3.quickGetValue(i, colIx); + kbuff._sum = c[colIx]; + kbuff._correction = c[colIx+n]; + kplus.execute2( kbuff, val ); + c[colIx] = kbuff._sum; + c[colIx+n] = kbuff._correction; + } + } } - - return kbuff._sum; } @@ -3492,37 +3539,43 @@ public class LibMatrixAgg } } - private static class AggTernaryTask extends AggTask + private static class AggTernaryTask implements Callable<MatrixBlock> { - private MatrixBlock _in1 = null; - private MatrixBlock _in2 = null; - private MatrixBlock _in3 = null; - private double _ret = -1; - private int _rl = -1; - private int _ru = -1; + private final MatrixBlock _in1; + private final MatrixBlock _in2; + private final MatrixBlock _in3; + private MatrixBlock _ret = null; + private final IndexFunction _ixFn; + private final int _rl; + private final int _ru; - protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, int rl, int ru ) + protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru ) throws DMLRuntimeException { _in1 = in1; _in2 = in2; - _in3 = in3; + _in3 = in3; + _ret = ret; + _ixFn = ixFn; _rl = rl; _ru = ru; } @Override - public Object call() throws DMLRuntimeException + public MatrixBlock call() throws DMLRuntimeException { + //thead-local allocation for partial aggregation + _ret = new MatrixBlock(_ret.rlen, _ret.clen, false); + _ret.allocateDenseBlock(); + if( !_in1.sparse && !_in2.sparse && (_in3==null||!_in3.sparse) ) //DENSE - _ret = aggregateTernaryDense(_in1, _in2, _in3, _rl, _ru); + aggregateTernaryDense(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru); else //GENERAL CASE - _ret = aggregateTernaryGeneric(_in1, _in2, _in3, _rl, _ru); + aggregateTernaryGeneric(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru); + + //recompute non-zeros of partial result + _ret.recomputeNonZeros(); - return null; - } - - public double getResult() { return _ret; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index fb4f196..5fc69ce 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -59,7 +59,6 @@ import org.apache.sysml.runtime.functionobjects.RevIndex; import org.apache.sysml.runtime.functionobjects.SortIndex; import org.apache.sysml.runtime.functionobjects.SwapIndex; import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; -import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -68,6 +67,7 @@ import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateOperator; +import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator; @@ -4886,7 +4886,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return ret; } - public ScalarObject aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op) + public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, boolean inCP) throws DMLRuntimeException { //check input dimensions and operators @@ -4895,15 +4895,23 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( !( op.aggOp.increOp.fn instanceof KahanPlus && op.binaryFn instanceof Multiply) ) throw new DMLRuntimeException("Unsupported operator for aggregate tertiary operations."); + //create output matrix block w/ corrections + int rl = (op.indexFn instanceof ReduceRow) ? 2 : 1; + int cl = (op.indexFn instanceof ReduceRow) ? m1.clen : 2; + if( ret == null ) + ret = new MatrixBlock(rl, cl, false); + else + ret.reset(rl, cl, false); + //execute ternary aggregate function - double val = -1; if( op.getNumThreads() > 1 ) - val = LibMatrixAgg.aggregateTernary(m1, m2, m3, op.getNumThreads()); + ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, op, op.getNumThreads()); else - val = LibMatrixAgg.aggregateTernary(m1, m2, m3); + ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, op); - //create output - return new DoubleObject(val); + if(op.aggOp.correctionExists && inCP) + ret.dropLastRowsOrColums(op.aggOp.correctionLocation); + return ret; } public MatrixBlock uaggouterchainOperations(MatrixBlock mbLeft, MatrixBlock mbRight, MatrixBlock mbOut, BinaryOperator bOp, AggregateUnaryOperator uaggOp) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java new file mode 100644 index 0000000..7ffdaff --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.sysml.runtime.matrix.operators; + +import java.io.Serializable; + +import org.apache.sysml.runtime.functionobjects.IndexFunction; +import org.apache.sysml.runtime.functionobjects.ValueFunction; + + +public class AggregateTernaryOperator extends Operator implements Serializable +{ + private static final long serialVersionUID = 4251745081160216784L; + + public ValueFunction binaryFn; + public AggregateOperator aggOp; + public IndexFunction indexFn; + private int k; //num threads + + public AggregateTernaryOperator(ValueFunction inner, AggregateOperator outer, IndexFunction ixfun) { + //default degree of parallelism is 1 (e.g., for distributed operations) + this( inner, outer, ixfun, 1 ); + } + + public AggregateTernaryOperator(ValueFunction inner, AggregateOperator outer, IndexFunction ixfun, int numThreads) + { + binaryFn = inner; + aggOp = outer; + indexFn = ixfun; + k = numThreads; + + //so far we only support sum-product and its sparse-safe + sparseSafe = true; + } + + public int getNumThreads() { + return k; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java new file mode 100644 index 0000000..6168025 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.ternary; + +import java.util.HashMap; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.instructions.Instruction; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; +import org.apache.sysml.utils.Statistics; + +/** + * + */ +public class TernaryAggregateTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "TernaryAggregateRC"; + private final static String TEST_NAME2 = "TernaryAggregateC"; + + private final static String TEST_DIR = "functions/ternary/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TernaryAggregateTest.class.getSimpleName() + "/"; + private final static double eps = 1e-8; + + private final static int rows = 1111; + private final static int cols = 1011; + + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.3; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) ); + } + + @Test + public void testTernaryAggregateRCDenseVectorCP() { + runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCSparseVectorCP() { + runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCDenseMatrixCP() { + runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCSparseMatrixCP() { + runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCDenseVectorSP() { + runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateRCSparseVectorSP() { + runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateRCDenseMatrixSP() { + runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateRCSparseMatrixSP() { + runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateRCDenseVectorMR() { + runTernaryAggregateTest(TEST_NAME1, false, true, true, ExecType.MR); + } + + @Test + public void testTernaryAggregateRCSparseVectorMR() { + runTernaryAggregateTest(TEST_NAME1, true, true, true, ExecType.MR); + } + + @Test + public void testTernaryAggregateRCDenseMatrixMR() { + runTernaryAggregateTest(TEST_NAME1, false, false, true, ExecType.MR); + } + + @Test + public void testTernaryAggregateRCSparseMatrixMR() { + runTernaryAggregateTest(TEST_NAME1, true, false, true, ExecType.MR); + } + + @Test + public void testTernaryAggregateCDenseVectorCP() { + runTernaryAggregateTest(TEST_NAME2, false, true, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateCSparseVectorCP() { + runTernaryAggregateTest(TEST_NAME2, true, true, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateCDenseMatrixCP() { + runTernaryAggregateTest(TEST_NAME2, false, false, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateCSparseMatrixCP() { + runTernaryAggregateTest(TEST_NAME2, true, false, true, ExecType.CP); + } + + @Test + public void testTernaryAggregateCDenseVectorSP() { + runTernaryAggregateTest(TEST_NAME2, false, true, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateCSparseVectorSP() { + runTernaryAggregateTest(TEST_NAME2, true, true, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateCDenseMatrixSP() { + runTernaryAggregateTest(TEST_NAME2, false, false, true, ExecType.SPARK); + } + + @Test + public void testTernaryAggregateCSparseMatrixSP() { + runTernaryAggregateTest(TEST_NAME2, true, false, true, ExecType.SPARK); + } + + //additional tests to check default without rewrites + + @Test + public void testTernaryAggregateRCDenseVectorCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, false, true, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCSparseVectorCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, true, true, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCDenseMatrixCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, false, false, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateRCSparseMatrixCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, true, false, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateCDenseVectorCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, false, true, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateCSparseVectorCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, true, true, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateCDenseMatrixCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, false, false, false, ExecType.CP); + } + + @Test + public void testTernaryAggregateCSparseMatrixCPNoRewrite() { + runTernaryAggregateTest(TEST_NAME2, true, false, false, ExecType.CP); + } + + + + private void runTernaryAggregateTest(String testname, boolean sparse, boolean vectors, boolean rewrites, ExecType et) + { + //rtplatform for MR + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + boolean rewritesOld = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES; + + try + { + TestConfiguration config = getTestConfiguration(testname); + loadTestConfiguration(config); + + OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewrites; + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + testname + ".dml"; + programArgs = new String[]{"-stats","-args", input("A"), output("R")}; + + fullRScriptName = HOME + testname + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + + inputDir() + " " + expectedDir(); + + //generate actual dataset + double sparsity = sparse ? sparsity2 : sparsity1; + double[][] A = getRandomMatrix(vectors ? rows*cols : rows, + vectors ? 1 : cols, 0, 1, sparsity, 17); + writeInputMatrixWithMTD("A", A, true); + + //run test cases + runTest(true, false, null, -1); + runRScript(true); + + //compare output matrices + HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); + HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R"); + + //check for rewritten patterns in statistics output + if( rewrites && et != ExecType.MR ) { + String opcode = ((et == ExecType.SPARK) ? Instruction.SP_INST_PREFIX : "") + + (((testname.equals(TEST_NAME1) || vectors ) ? "tak+*" : "tack+*")); + Assert.assertEquals(new Boolean(true), new Boolean( + Statistics.getCPHeavyHitterOpCodes().contains(opcode))); + } + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewritesOld; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.R b/src/test/scripts/functions/ternary/TernaryAggregateC.R new file mode 100644 index 0000000..8e83ebf --- /dev/null +++ b/src/test/scripts/functions/ternary/TernaryAggregateC.R @@ -0,0 +1,33 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +A = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) +B = A * 2; +C = A * 3; + +R = t(as.matrix(colSums(A * B * C))); + +writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.dml b/src/test/scripts/functions/ternary/TernaryAggregateC.dml new file mode 100644 index 0000000..a171ff4 --- /dev/null +++ b/src/test/scripts/functions/ternary/TernaryAggregateC.dml @@ -0,0 +1,30 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = A * 2; +C = A * 3; + +if(1==1){} + +R = colSums(A * B * C); + +write(R, $2); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.R ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.R b/src/test/scripts/functions/ternary/TernaryAggregateRC.R new file mode 100644 index 0000000..96e793e --- /dev/null +++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.R @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +A = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) +B = A * 2; +C = A * 3; + +s = sum(A * B * C); +R = as.matrix(s); + +writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.dml b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml new file mode 100644 index 0000000..485570d --- /dev/null +++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml @@ -0,0 +1,31 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1); +B = A * 2; +C = A * 3; + +if(1==1){} + +s = sum(A * B * C); +R = as.matrix(s); + +write(R, $2); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java index 83f217d..784177d 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java @@ -31,7 +31,8 @@ import org.junit.runners.Suite; CTableMatrixIgnoreZerosTest.class, CTableSequenceTest.class, QuantileWeightsTest.class, - TableOutputTest.class + TableOutputTest.class, + TernaryAggregateTest.class, })
