Repository: incubator-systemml
Updated Branches:
  refs/heads/master 184e02dac -> 21b96855b


[SYSTEMML-1255] New cp/spark column-wise ternary aggregates, tests 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/21b96855
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/21b96855
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/21b96855

Branch: refs/heads/master
Commit: 21b96855b4e6f36b379cc3ebc5bb95435158d35a
Parents: 184e02d
Author: Matthias Boehm <[email protected]>
Authored: Thu Feb 16 22:08:45 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Feb 17 13:55:02 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggUnaryOp.java  |  18 +-
 .../org/apache/sysml/lops/TernaryAggregate.java |  24 +-
 .../runtime/compress/CompressedMatrixBlock.java |   5 +-
 .../instructions/CPInstructionParser.java       |   3 +-
 .../runtime/instructions/InstructionUtils.java  |  16 ++
 .../instructions/SPInstructionParser.java       |   2 +-
 .../cp/AggregateTernaryCPInstruction.java       |  32 ++-
 .../spark/AggregateTernarySPInstruction.java    | 128 +++++----
 .../spark/AggregateUnarySPInstruction.java      |  28 +-
 .../spark/ComputationSPInstruction.java         |  27 ++
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 191 ++++++++-----
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  22 +-
 .../operators/AggregateTernaryOperator.java     |  57 ++++
 .../functions/ternary/TernaryAggregateTest.java | 267 +++++++++++++++++++
 .../functions/ternary/TernaryAggregateC.R       |  33 +++
 .../functions/ternary/TernaryAggregateC.dml     |  30 +++
 .../functions/ternary/TernaryAggregateRC.R      |  34 +++
 .../functions/ternary/TernaryAggregateRC.dml    |  31 +++
 .../functions/ternary/ZPackageSuite.java        |   3 +-
 19 files changed, 754 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java 
b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
index 9964981..fce424c 100644
--- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
@@ -119,7 +119,7 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                        if ( et == ExecType.CP ) 
                        {
                                Lop agg1 = null;
-                               if( isTernaryAggregateRewriteApplicable() ) {
+                               if( isTernaryAggregateRewriteApplicable(et) ) {
                                        agg1 = 
constructLopsTernaryAggregateRewrite(et);
                                }
                                else if( 
isUnaryAggregateOuterCPRewriteApplicable() )
@@ -245,7 +245,7 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                                DirectionTypes dir = 
HopsDirection2Lops.get(_direction);
 
                                //unary aggregate
-                               if( isTernaryAggregateRewriteApplicable() ) 
+                               if( isTernaryAggregateRewriteApplicable(et) ) 
                                {
                                        Lop aggregate = 
constructLopsTernaryAggregateRewrite(et);
                                        setOutputDimensions(aggregate); //0x0 
(scalar)
@@ -488,14 +488,15 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                        return SparkAggType.MULTI_BLOCK;
        }
 
-       private boolean isTernaryAggregateRewriteApplicable() throws 
HopsException 
+       private boolean isTernaryAggregateRewriteApplicable(ExecType et) 
+               throws HopsException 
        {
                boolean ret = false;
                
                //currently we support only sum over binary multiply but 
potentially 
                //it can be generalized to any RC aggregate over two common 
binary operations
-               if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES &&
-                       _direction == Direction.RowCol && _op == AggOp.SUM ) 
+               if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && _op == 
AggOp.SUM &&
+                       (_direction == Direction.RowCol || _direction == 
Direction.Col)  ) 
                {
                        Hop input1 = getInput().get(0);
                        if( input1.getParent().size() == 1 && //sum single 
consumer
@@ -639,7 +640,6 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                Hop input11 = input1.getInput().get(0);
                Hop input12 = input1.getInput().get(1);
                
-               Lop ret = null;
                Lop in1 = null;
                Lop in2 = null;
                Lop in3 = null;
@@ -668,10 +668,10 @@ public class AggUnaryOp extends Hop implements 
MultiThreadedHop
                // The execution type of a unary aggregate instruction should 
depend on the execution type of inputs to avoid OOM
                // Since we only support matrix-vector and not vector-matrix, 
checking the execution type of input1 should suffice.
                ExecType et_input = input1.optFindExecType();
-               ret = new TernaryAggregate(in1, in2, in3, 
Aggregate.OperationTypes.KahanSum, 
-                               Binary.OperationTypes.MULTIPLY, 
DataType.SCALAR, ValueType.DOUBLE, et_input, k);
+               DirectionTypes dir = HopsDirection2Lops.get(_direction);
                
-               return ret;
+               return new TernaryAggregate(in1, in2, in3, 
Aggregate.OperationTypes.KahanSum, 
+                               Binary.OperationTypes.MULTIPLY, dir, 
getDataType(), ValueType.DOUBLE, et_input, k);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java 
b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
index 0932424..08600e4 100644
--- a/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
+++ b/src/main/java/org/apache/sysml/lops/TernaryAggregate.java
@@ -21,22 +21,24 @@ package org.apache.sysml.lops;
 
 import org.apache.sysml.lops.LopProperties.ExecLocation;
 import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.PartialAggregate.DirectionTypes;
 import org.apache.sysml.lops.compile.JobType;
 import org.apache.sysml.parser.Expression.*;
 
 public class TernaryAggregate extends Lop 
 {
-       
-       private static final String OPCODE = "tak+*";
+       public static final String OPCODE_RC = "tak+*";
+       public static final String OPCODE_C = "tack+*";
        
        //NOTE: currently only used for ta+*
        //private Aggregate.OperationTypes _aggOp = null;
        //private Binary.OperationTypes _binOp = null;
+       private DirectionTypes _direction;
        
        //optional attribute for cp
        private int _numThreads = -1;
 
-       public TernaryAggregate(Lop input1, Lop input2, Lop input3, 
Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DataType dt, 
ValueType vt, ExecType et, int k ) 
+       public TernaryAggregate(Lop input1, Lop input2, Lop input3, 
Aggregate.OperationTypes aggOp, Binary.OperationTypes binOp, DirectionTypes 
direction, DataType dt, ValueType vt, ExecType et, int k ) 
        {
                super(Lop.Type.TernaryAggregate, dt, vt);
                
@@ -50,6 +52,7 @@ public class TernaryAggregate extends Lop
                input2.addOutput(this);
                input3.addOutput(this);
                
+               _direction = direction;
                _numThreads = k;
                
                boolean breaksAlignment = false;
@@ -60,9 +63,8 @@ public class TernaryAggregate extends Lop
        }
        
        @Override
-       public String toString()
-       {
-               return "Operation: "+OPCODE;            
+       public String toString() {
+               return "Operation: "+getOpCode();
        }
        
        @Override
@@ -72,7 +74,7 @@ public class TernaryAggregate extends Lop
                StringBuilder sb = new StringBuilder();
                sb.append( getExecType() );
                sb.append( OPERAND_DELIMITOR );
-               sb.append( OPCODE );
+               sb.append( getOpCode() );
                sb.append( OPERAND_DELIMITOR );
                sb.append( getInputs().get(0).prepInputOperand(input1));
                sb.append( OPERAND_DELIMITOR );
@@ -89,4 +91,12 @@ public class TernaryAggregate extends Lop
                
                return sb.toString();
        }
+       
+       private String getOpCode() {
+               switch( _direction ) {
+                       case RowCol: return OPCODE_RC;
+                       case Col: return OPCODE_C;
+                       default: throw new RuntimeException("Unsupported 
aggregation direction: "+_direction);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 84c4812..e345fca 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -77,6 +77,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
@@ -1917,13 +1918,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        }
 
        @Override
-       public ScalarObject aggregateTernaryOperations(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op)
+       public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, 
boolean inCP)
                        throws DMLRuntimeException {
                printDecompressWarning("aggregateTernaryOperations");
                MatrixBlock left = isCompressed() ? decompress() : this;
                MatrixBlock right1 = getUncompressed(m2);
                MatrixBlock right2 = getUncompressed(m3);
-               return left.aggregateTernaryOperations(left, right1, right2, 
op);
+               return left.aggregateTernaryOperations(left, right1, right2, 
ret, op, inCP);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 9cf56d9..f3c1605 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -78,7 +78,8 @@ public class CPInstructionParser extends InstructionParser
                String2CPInstructionType = new HashMap<String, 
CPINSTRUCTION_TYPE>();
 
                String2CPInstructionType.put( "ba+*"    , 
CPINSTRUCTION_TYPE.AggregateBinary);
-               String2CPInstructionType.put( "tak+*"           , 
CPINSTRUCTION_TYPE.AggregateTernary);
+               String2CPInstructionType.put( "tak+*"   , 
CPINSTRUCTION_TYPE.AggregateTernary);
+               String2CPInstructionType.put( "tack+*"  , 
CPINSTRUCTION_TYPE.AggregateTernary);
                
                String2CPInstructionType.put( "uak+"    , 
CPINSTRUCTION_TYPE.AggregateUnary);
                String2CPInstructionType.put( "uark+"   , 
CPINSTRUCTION_TYPE.AggregateUnary);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 6ef39f1..2a990d4 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -48,6 +48,7 @@ import org.apache.sysml.runtime.functionobjects.Divide;
 import org.apache.sysml.runtime.functionobjects.Equals;
 import org.apache.sysml.runtime.functionobjects.GreaterThan;
 import org.apache.sysml.runtime.functionobjects.GreaterThanEquals;
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
 import org.apache.sysml.runtime.functionobjects.IntegerDivide;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
@@ -76,6 +77,7 @@ import 
org.apache.sysml.runtime.instructions.gpu.GPUInstruction.GPUINSTRUCTION_T
 import 
org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE;
 import 
org.apache.sysml.runtime.instructions.spark.SPInstruction.SPINSTRUCTION_TYPE;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.LeftScalarOperator;
@@ -372,6 +374,20 @@ public class InstructionUtils
                return aggun;
        }
 
+       public static AggregateTernaryOperator 
parseAggregateTernaryOperator(String opcode) {
+               return parseAggregateTernaryOperator(opcode, 1);
+       }
+       
+       public static AggregateTernaryOperator 
parseAggregateTernaryOperator(String opcode, int numThreads) {
+               CorrectionLocationType corr = opcode.equalsIgnoreCase("tak+*") 
? 
+                               CorrectionLocationType.LASTCOLUMN : 
CorrectionLocationType.LASTROW;
+               AggregateOperator agg = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), true, corr);
+               IndexFunction ixfun = opcode.equalsIgnoreCase("tak+*") ? 
+                       ReduceAll.getReduceAllFnObject() : 
ReduceRow.getReduceRowFnObject();                                    
+               
+               return new 
AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg, ixfun, 
numThreads);
+       }
+       
        public static AggregateOperator parseAggregateOperator(String opcode, 
String corrExists, String corrLoc)
        {
                AggregateOperator agg = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 9008f09..fa05de3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -131,7 +131,7 @@ public class SPInstructionParser extends InstructionParser
                
                //ternary aggregate operators
                String2SPInstructionType.put( "tak+*"      , 
SPINSTRUCTION_TYPE.AggregateTernary);
-
+               String2SPInstructionType.put( "tack+*"     , 
SPINSTRUCTION_TYPE.AggregateTernary);
                
                String2SPInstructionType.put( "rangeReIndex"    , 
SPINSTRUCTION_TYPE.MatrixIndexing);
                String2SPInstructionType.put( "leftIndex"       , 
SPINSTRUCTION_TYPE.MatrixIndexing);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
index b88c062..147436e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/AggregateTernaryCPInstruction.java
@@ -21,18 +21,15 @@ package org.apache.sysml.runtime.instructions.cp;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
 public class AggregateTernaryCPInstruction extends ComputationCPInstruction
-{
-       public AggregateTernaryCPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand in3, 
-                                         CPOperand out, String opcode, String 
istr  )
+{      
+       public AggregateTernaryCPInstruction(Operator op, CPOperand in1, 
CPOperand in2, 
+               CPOperand in3, CPOperand out, String opcode, String istr  )
        {
                super(op, in1, in2, in3, out, opcode, istr);
                _cptype = CPINSTRUCTION_TYPE.AggregateTernary;
@@ -44,7 +41,7 @@ public class AggregateTernaryCPInstruction extends 
ComputationCPInstruction
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                String opcode = parts[0];
                
-               if ( opcode.equalsIgnoreCase("tak+*")) {
+               if ( opcode.equalsIgnoreCase("tak+*") || 
opcode.equalsIgnoreCase("tack+*") ) {
                        InstructionUtils.checkNumFields( parts, 5 );
                        
                        CPOperand in1 = new CPOperand(parts[1]);
@@ -52,16 +49,13 @@ public class AggregateTernaryCPInstruction extends 
ComputationCPInstruction
                        CPOperand in3 = new CPOperand(parts[3]);
                        CPOperand out = new CPOperand(parts[4]);
                        int numThreads = Integer.parseInt(parts[5]);
-                               
-                       AggregateOperator agg = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject());
-                       AggregateBinaryOperator op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, numThreads);
                        
+                       AggregateTernaryOperator op = 
InstructionUtils.parseAggregateTernaryOperator(opcode, numThreads);
                        return new AggregateTernaryCPInstruction(op, in1, in2, 
in3, out, opcode, str);
                } 
                else {
-                       throw new 
DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown 
opcode " + opcode);
-               }
-               
+                       throw new 
DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown 
opcode " + opcode);
+               }               
        }
        
        @Override
@@ -73,14 +67,18 @@ public class AggregateTernaryCPInstruction extends 
ComputationCPInstruction
         MatrixBlock matBlock3 = input3.isLiteral() ? null : //matrix or 
literal 1
                                                        
ec.getMatrixInput(input3.getName());
                        
-               AggregateBinaryOperator ab_op = (AggregateBinaryOperator) _optr;
-               ScalarObject ret = 
matBlock1.aggregateTernaryOperations(matBlock1, matBlock2, matBlock3, ab_op);
+               AggregateTernaryOperator ab_op = (AggregateTernaryOperator) 
_optr;
+               MatrixBlock ret = matBlock1.aggregateTernaryOperations(
+                               matBlock1, matBlock2, matBlock3, new 
MatrixBlock(), ab_op, true);
                        
                //release inputs/outputs
                ec.releaseMatrixInput(input1.getName());
                ec.releaseMatrixInput(input2.getName());
                if( !input3.isLiteral() )
                        ec.releaseMatrixInput(input3.getName());
-               ec.setScalarOutput(output.getName(), ret);
+               if( output.getDataType().isScalar() )
+                       ec.setScalarOutput(output.getName(), new 
DoubleObject(ret.quickGetValue(0, 0)));
+               else
+                       ec.setMatrixOutput(output.getName(), ret);      
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
index 4a7b130..2a305be 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
@@ -20,32 +20,29 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
-import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import 
org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
 public class AggregateTernarySPInstruction extends ComputationSPInstruction
 {
-       
-       public AggregateTernarySPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand in3, 
-                                                    CPOperand out, String 
opcode, String istr )
+       public AggregateTernarySPInstruction(Operator op, CPOperand in1, 
CPOperand in2, 
+               CPOperand in3, CPOperand out, String opcode, String istr )
        {
                super(op, in1, in2, in3, out, opcode, istr);
                _sptype = SPINSTRUCTION_TYPE.AggregateTernary;
@@ -57,21 +54,19 @@ public class AggregateTernarySPInstruction extends 
ComputationSPInstruction
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                String opcode = parts[0];
                
-               if ( opcode.equalsIgnoreCase("tak+*")) {
-                       InstructionUtils.checkNumFields ( parts, 4 );
+               if ( opcode.equalsIgnoreCase("tak+*") || 
opcode.equalsIgnoreCase("tack+*") ) {
+                       InstructionUtils.checkNumFields( parts, 4 );
                        
                        CPOperand in1 = new CPOperand(parts[1]);
                        CPOperand in2 = new CPOperand(parts[2]);
                        CPOperand in3 = new CPOperand(parts[3]);
                        CPOperand out = new CPOperand(parts[4]);
-
-                       AggregateOperator agg = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject());
-                       AggregateBinaryOperator op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
                        
+                       AggregateTernaryOperator op = 
InstructionUtils.parseAggregateTernaryOperator(opcode);
                        return new AggregateTernarySPInstruction(op, in1, in2, 
in3, out, opcode, str);
                } 
                else {
-                       throw new 
DMLRuntimeException("AggregateTertiaryInstruction.parseInstruction():: Unknown 
opcode " + opcode);
+                       throw new 
DMLRuntimeException("AggregateTernaryInstruction.parseInstruction():: Unknown 
opcode " + opcode);
                }
        }
        
@@ -81,88 +76,109 @@ public class AggregateTernarySPInstruction extends 
ComputationSPInstruction
        {       
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                
-               //get input
+               //get inputs
+               MatrixCharacteristics mcIn = sec.getMatrixCharacteristics( 
input1.getName() );
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
                JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
                JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = input3.isLiteral() 
? null : //matrix or literal 1
                                                                                
                         sec.getBinaryBlockRDDHandleForVariable( 
input3.getName() );
                
                //execute aggregate ternary operation
-               AggregateBinaryOperator aggop = (AggregateBinaryOperator) _optr;
-               JavaRDD<MatrixBlock> out = null;
+               AggregateTernaryOperator aggop = (AggregateTernaryOperator) 
_optr;
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
                if( in3 != null ) { //3 inputs
-                       out = in1.join( in2 ).join( in3 ).values()
-                                    .map(new 
RDDAggregateTernaryFunction(aggop));
+                       out = in1.join( in2 ).join( in3 )
+                                    .mapToPair(new 
RDDAggregateTernaryFunction(aggop));
                }
                else { //2 inputs (third is literal 1)
-                       out = in1.join( in2 ).values()
-                                        .map(new 
RDDAggregateTernaryFunction2(aggop));                         
+                       out = in1.join( in2 )
+                                        .mapToPair(new 
RDDAggregateTernaryFunction2(aggop));                           
+               }
+               
+               //aggregate partial results
+               if( aggop.indexFn instanceof ReduceAll ) //tak+*
+               {
+                       //aggregate and create output (no lineage because 
scalar)          
+                       MatrixBlock tmp = 
RDDAggregateUtils.sumStable(out.values());
+                       DoubleObject ret = new DoubleObject(tmp.getValue(0, 0));
+                       sec.setVariable(output.getName(), ret); 
+               }
+               else if( mcIn.dimsKnown() && 
mcIn.getCols()<=mcIn.getColsPerBlock() ) //tack+* single block
+               {
+                       //single block aggregation and drop correction
+                       MatrixBlock ret = RDDAggregateUtils.aggStable(out, 
aggop.aggOp);
+                       
ret.dropLastRowsOrColums(aggop.aggOp.correctionLocation);
+                       
+                       //put output block into symbol table (no lineage 
because single block)
+                       //this also includes implicit maintenance of matrix 
characteristics
+                       sec.setMatrixOutput(output.getName(), ret);             
+               }
+               else //tack+* multi block
+               {
+                       //multi-block aggregation and drop correction
+                       out = RDDAggregateUtils.aggByKeyStable(out, 
aggop.aggOp);
+                       out = out.mapValues( new 
AggregateDropCorrectionFunction(aggop.aggOp) );
+
+                       //put output RDD handle into symbol table
+                       updateUnaryAggOutputMatrixCharacteristics(sec, 
aggop.indexFn);
+                       sec.setRDDHandleForVariable(output.getName(), out);     
+                       sec.addLineageRDD(output.getName(), input1.getName());
+                       sec.addLineageRDD(output.getName(), input2.getName());
+                       if( in3 != null )
+                               sec.addLineageRDD(output.getName(), 
input3.getName());
                }
-                               
-               //aggregate and create output (no lineage because scalar)       
   
-               MatrixBlock tmp = RDDAggregateUtils.sumStable(out);
-               DoubleObject ret = new DoubleObject(tmp.getValue(0, 0));
-               sec.setVariable(output.getName(), ret);
        }
 
        private static class RDDAggregateTernaryFunction 
-               implements 
Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock>
+               implements PairFunction<Tuple2<MatrixIndexes, 
Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>>, MatrixIndexes, 
MatrixBlock>
        {
                private static final long serialVersionUID = 
6410232464410434210L;
                
-               private AggregateBinaryOperator _aggop = null;
+               private final AggregateTernaryOperator _aggop;
                
-               public RDDAggregateTernaryFunction( AggregateBinaryOperator 
aggop ) 
-               {
-                       _aggop = aggop;         
+               public RDDAggregateTernaryFunction( AggregateTernaryOperator 
aggop ) {
+                       _aggop = aggop;
                }
        
                @Override
-               public MatrixBlock call(Tuple2<Tuple2<MatrixBlock, 
MatrixBlock>, MatrixBlock> arg0)
+               public Tuple2<MatrixIndexes,MatrixBlock> 
call(Tuple2<MatrixIndexes,Tuple2<Tuple2<MatrixBlock, MatrixBlock>, 
MatrixBlock>> arg0)
                        throws Exception 
                {
                        //get inputs
-                       MatrixBlock in1 = arg0._1()._1();
-                       MatrixBlock in2 = arg0._1()._2();
-                       MatrixBlock in3 = arg0._2();
+                       MatrixIndexes ix = arg0._1();
+                       MatrixBlock in1 = arg0._2()._1()._1();
+                       MatrixBlock in2 = arg0._2()._1()._2();
+                       MatrixBlock in3 = arg0._2()._2();
                        
                        //execute aggregate ternary operation
-                       ScalarObject ret = in1.aggregateTernaryOperations(in1, 
in2, in3, _aggop);
-                       
-                       //create output matrix block (w/ correction)
-                       MatrixBlock out = new MatrixBlock(2,1,false);
-                       out.quickSetValue(0, 0, ret.getDoubleValue());
-                       return out;
+                       return new Tuple2<MatrixIndexes, MatrixBlock>(new 
MatrixIndexes(1, ix.getColumnIndex()),
+                               in1.aggregateTernaryOperations(in1, in2, in3, 
new MatrixBlock(), _aggop, false));
                }
        }
 
        private static class RDDAggregateTernaryFunction2 
-               implements Function<Tuple2<MatrixBlock,MatrixBlock>, 
MatrixBlock>
+               implements 
PairFunction<Tuple2<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>>, 
MatrixIndexes, MatrixBlock>
        {
                private static final long serialVersionUID = 
-6615412819746331700L;
                
-               private AggregateBinaryOperator _aggop = null;
+               private final AggregateTernaryOperator _aggop;
                
-               public RDDAggregateTernaryFunction2( AggregateBinaryOperator 
aggop ) 
-               {
+               public RDDAggregateTernaryFunction2( AggregateTernaryOperator 
aggop ) {
                        _aggop = aggop;         
                }
        
                @Override
-               public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
+               public Tuple2<MatrixIndexes,MatrixBlock> 
call(Tuple2<MatrixIndexes,Tuple2<MatrixBlock, MatrixBlock>> arg0)
                        throws Exception 
                {
                        //get inputs
-                       MatrixBlock in1 = arg0._1();
-                       MatrixBlock in2 = arg0._2();
+                       MatrixIndexes ix = arg0._1();
+                       MatrixBlock in1 = arg0._2()._1();
+                       MatrixBlock in2 = arg0._2()._2();
                        
                        //execute aggregate ternary operation
-                       ScalarObject ret = in1.aggregateTernaryOperations(in1, 
in2, null, _aggop);
-                       
-                       //create output matrix block (w/ correction)
-                       MatrixBlock out = new MatrixBlock(2,1,false);
-                       out.quickSetValue(0, 0, ret.getDoubleValue());
-                       return out;
+                       return new Tuple2<MatrixIndexes,MatrixBlock>(new 
MatrixIndexes(1, ix.getColumnIndex()),
+                               in1.aggregateTernaryOperations(in1, in2, null, 
new MatrixBlock(), _aggop, false));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
index 54ca925..eb9324f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -31,9 +31,6 @@ import 
org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
@@ -130,35 +127,12 @@ public class AggregateUnarySPInstruction extends 
UnarySPInstruction
                        }
                        
                        //put output RDD handle into symbol table
-                       updateUnaryAggOutputMatrixCharacteristics(sec);
+                       updateUnaryAggOutputMatrixCharacteristics(sec, 
auop.indexFn);
                        sec.setRDDHandleForVariable(output.getName(), out);     
                        sec.addLineageRDD(output.getName(), input1.getName());
                }               
        }
 
-       protected void 
updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec) 
-               throws DMLRuntimeException
-       {
-               AggregateUnaryOperator auop = (AggregateUnaryOperator)_optr;
-               
-               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
-               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
-               if(!mcOut.dimsKnown()) {
-                       if(!mc1.dimsKnown()) {
-                               throw new DMLRuntimeException("The output 
dimensions are not specified and cannot be inferred from input:" + 
mc1.toString() + " " + mcOut.toString());
-                       }
-                       else {
-                               //infer statistics from input based on operator
-                               if( auop.indexFn instanceof ReduceAll )
-                                       mcOut.set(1, 1, mc1.getRowsPerBlock(), 
mc1.getColsPerBlock());
-                               else if (auop.indexFn instanceof ReduceCol)
-                                       mcOut.set(mc1.getRows(), 1, 
mc1.getRowsPerBlock(), mc1.getColsPerBlock());
-                               else if (auop.indexFn instanceof ReduceRow)
-                                       mcOut.set(1, mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock());
-                       }
-               }
-       }
-
        private static class RDDUAggFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
        {
                private static final long serialVersionUID = 
2672082409287856038L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
index 7a152a6..3d26b4c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ComputationSPInstruction.java
@@ -21,6 +21,10 @@ package org.apache.sysml.runtime.instructions.spark;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -87,4 +91,27 @@ public abstract class ComputationSPInstruction extends 
SPInstruction {
                                
sec.getMatrixCharacteristics(output.getName()).set(mcIn1.getRows(), 
mcIn1.getCols(), mcIn1.getRowsPerBlock(), mcIn1.getRowsPerBlock());
                }
        }
+       
+       protected void 
updateUnaryAggOutputMatrixCharacteristics(SparkExecutionContext sec, 
IndexFunction ixFn) 
+               throws DMLRuntimeException
+       {
+               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+               if( mcOut.dimsKnown() )
+                       return;
+               
+               if(!mc1.dimsKnown()) {
+                       throw new DMLRuntimeException("The output dimensions 
are not specified and "
+                               + "cannot be inferred from input:" + 
mc1.toString() + " " + mcOut.toString());
+               }
+               else {
+                       //infer statistics from input based on operator
+                       if( ixFn instanceof ReduceAll )
+                               mcOut.set(1, 1, mc1.getRowsPerBlock(), 
mc1.getColsPerBlock());
+                       else if( ixFn instanceof ReduceCol )
+                               mcOut.set(mc1.getRows(), 1, 
mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+                       else if( ixFn instanceof ReduceRow )
+                               mcOut.set(1, mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 914480b..404f440 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.matrix.data;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -48,6 +49,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
@@ -388,64 +390,76 @@ public class LibMatrixAgg
                return out;
        }
 
-       public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, 
MatrixBlock in3)
+       public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock 
in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op) 
+               throws DMLRuntimeException
        {
                //early abort if any block is empty
                if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || 
in3!=null&&in3.isEmptyBlock(false) ) {
-                       return 0;
+                       return ret;
                }
-                               
+               
                //Timing time = new Timing(true);
                
-               double val = -1;
+               //allocate output arrays (if required)
+               ret.reset(ret.rlen, ret.clen, false); //always dense
+               ret.allocateDenseBlock();
+               
+               IndexFunction ixFn = op.indexFn;
                if( !in1.sparse && !in2.sparse && (in3==null||!in3.sparse) ) 
//DENSE
-                       val = aggregateTernaryDense(in1, in2, in3, 0, in1.rlen);
+                       aggregateTernaryDense(in1, in2, in3, ret, ixFn, 0, 
in1.rlen);
                else //GENERAL CASE
-                       val = aggregateTernaryGeneric(in1, in2, in3, 0, 
in1.rlen);
+                       aggregateTernaryGeneric(in1, in2, in3, ret, ixFn, 0, 
in1.rlen);
                
-               //System.out.println("tak+ 
("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in 
"+time.stop()+"ms.");
+               //cleanup output and change representation (if necessary)
+               ret.recomputeNonZeros();
+               ret.examSparsity();
                
-               return val;                     
+               //System.out.println("tak+ 
("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in 
"+time.stop()+"ms.");
+       
+               return ret;
        }
 
-       public static double aggregateTernary(MatrixBlock in1, MatrixBlock in2, 
MatrixBlock in3, int k) 
+       public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock 
in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) 
                throws DMLRuntimeException
        {               
                //fall back to sequential version if necessary
-               if( k <= 1 || in1.rlen/3 < PAR_NUMCELL_THRESHOLD ) {
-                       return aggregateTernary(in1, in2, in3);
+               if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD 
|| in1.rlen <= k/2 
+                       || (!(op.indexFn instanceof ReduceCol) &&  ret.clen*8*k 
> PAR_INTERMEDIATE_SIZE_THRESHOLD) ) {
+                       return aggregateTernary(in1, in2, in3, ret, op);
                }
                
                //early abort if any block is empty
                if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || 
in3!=null&&in3.isEmptyBlock(false) ) {
-                       return 0;
+                       return ret;
                }
                        
                //Timing time = new Timing(true);
                
-               double val = -1;
                try {
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        ArrayList<AggTernaryTask> tasks = new 
ArrayList<AggTernaryTask>();
                        int blklen = (int)(Math.ceil((double)in1.rlen/k));
+                       IndexFunction ixFn = op.indexFn;
                        for( int i=0; i<k & i*blklen<in1.rlen; i++ )
-                               tasks.add( new AggTernaryTask(in1, in2, in3, 
i*blklen, Math.min((i+1)*blklen, in1.rlen)));
-                       pool.invokeAll(tasks);  
+                               tasks.add( new AggTernaryTask(in1, in2, in3, 
ret, ixFn, i*blklen, Math.min((i+1)*blklen, in1.rlen)));
+                       List<Future<MatrixBlock>> rtasks = 
pool.invokeAll(tasks);       
                        pool.shutdown();
-                       //aggregate partial results
-                       KahanObject kbuff = new KahanObject(0, 0);
-                       KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
-                       for( AggTernaryTask task : tasks )
-                               kplus.execute2(kbuff, task.getResult());
-                       val = kbuff._sum;
+                       //aggregate partial results and error handling
+                       ret.copy(rtasks.get(0).get()); //for init
+                       for( int i=1; i<rtasks.size(); i++ )
+                               aggregateFinalResult(op.aggOp, ret, 
rtasks.get(i).get());
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
                
-               //System.out.println("tak+ k="+k+" 
("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in 
"+time.stop()+"ms.");
+               //cleanup output and change representation (if necessary)
+               ret.recomputeNonZeros();
+               ret.examSparsity();
                
-               return val;                     
+               //System.out.println("tak+ k="+k+" 
("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in 
"+time.stop()+"ms."); 
+       
+               return ret;
        }
 
        public static void groupedAggregate(MatrixBlock groups, MatrixBlock 
target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op) 
@@ -642,49 +656,70 @@ public class LibMatrixAgg
                        out.binaryOperationsInPlace(laop.increOp, partout);
        }
 
-       private static double aggregateTernaryDense(MatrixBlock in1, 
MatrixBlock in2, MatrixBlock in3, int rl, int ru)
+       private static void aggregateTernaryDense(MatrixBlock in1, MatrixBlock 
in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru)
        {
                //compute block operations
                KahanObject kbuff = new KahanObject(0, 0);
                KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
                
                double[] a = in1.denseBlock;
-               double[] b = in2.denseBlock;
+               double[] b1 = in2.denseBlock;
+               double[] b2 = (in3!=null) ? in3.denseBlock : null; //if null, 
literal 1
                final int n = in1.clen;
                
-               if( in3 != null ) //3 inputs
+               if( ixFn instanceof ReduceAll ) //tak+*
                {
-                       double[] c = in3.denseBlock;
-                       
                        for( int i=rl, ix=rl*n; i<ru; i++ ) 
                                for( int j=0; j<n; j++, ix++ ) {
-                                       double val = a[ix] * b[ix] * c[ix];
+                                       double b2val = (b2 != null) ? b2[ix] : 
1;
+                                       double val = a[ix] * b1[ix] * b2val;
                                        kplus.execute2( kbuff, val );
                                }
+                       ret.quickSetValue(0, 0, kbuff._sum);
+                       ret.quickSetValue(0, 1, kbuff._correction);
                }
-               else //2 inputs (third: literal 1)
+               else //tack+*
                {
-                       for( int i=rl, ix=rl*n; i<ru; i++ ) 
+                       double[] c = ret.getDenseBlock();
+                       for( int i=rl, ix=rl*n; i<ru; i++ )
                                for( int j=0; j<n; j++, ix++ ) {
-                                       double val = a[ix] * b[ix];
-                                       kplus.execute2( kbuff, val );
+                                       double b2val = (b2 != null) ? b2[ix] : 
1;
+                                       double val = a[ix] * b1[ix] * b2val;
+                                       kbuff._sum = c[j];
+                                       kbuff._correction = c[j+n];
+                                       kplus.execute2(kbuff, val);
+                                       c[j] = kbuff._sum;
+                                       c[j+n] = kbuff._correction;
                                }
                }
-               
-               return kbuff._sum;
        }
 
-       private static double aggregateTernaryGeneric(MatrixBlock in1, 
MatrixBlock in2, MatrixBlock in3, int rl, int ru)
+       private static void aggregateTernaryGeneric(MatrixBlock in1, 
MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, 
int ru)
        {               
                //compute block operations
                KahanObject kbuff = new KahanObject(0, 0);
                KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+               
+               //guaranteed to have at least one sparse input, sort by nnz, 
assume num cells if 
+               //(potentially incorrect) in dense representation, keep null at 
end via stable sort
+               MatrixBlock[] blocks = new MatrixBlock[]{in1, in2, in3};
+               Arrays.sort(blocks, new Comparator<MatrixBlock>() {
+                       @Override
+                       public int compare(MatrixBlock o1, MatrixBlock o2) {
+                               long nnz1 = (o1!=null && o1.sparse) ? 
o1.nonZeros : Long.MAX_VALUE;
+                               long nnz2 = (o2!=null && o2.sparse) ? 
o2.nonZeros : Long.MAX_VALUE;
+                               return Long.compare(nnz1, nnz2);
+                       }
+               });
+               MatrixBlock lin1 = blocks[0];
+               MatrixBlock lin2 = blocks[1];
+               MatrixBlock lin3 = blocks[2];
+               
+               SparseBlock a = lin1.sparseBlock;
                final int n = in1.clen;
                
-               if( in1.sparse )
+               if( ixFn instanceof ReduceAll ) //tak+*
                {
-                       SparseBlock a = in1.sparseBlock;
-                       
                        for( int i=rl; i<ru; i++ )
                                if( !a.isEmpty(i) ) {
                                        int apos = a.pos(i);
@@ -693,28 +728,40 @@ public class LibMatrixAgg
                                        double[] avals = a.values(i);
                                        for( int j=apos; j<apos+alen; j++ ) {
                                                double val1 = avals[j];
-                                               double val2 = 
in2.quickGetValue(i, aix[j]);
+                                               double val2 = 
lin2.quickGetValue(i, aix[j]);
                                                double val = val1 * val2;
-                                               if( val != 0 && in3 != null )
-                                                       val *= 
in3.quickGetValue(i, aix[j]);
+                                               if( val != 0 && lin3 != null )
+                                                       val *= 
lin3.quickGetValue(i, aix[j]);
                                                kplus.execute2( kbuff, val );   
                                                
                                        }
                                }       
+                       ret.quickSetValue(0, 0, kbuff._sum);
+                       ret.quickSetValue(0, 1, kbuff._correction);
                }
-               else //generic case
+               else //tack+*
                {
+                       double[] c = ret.getDenseBlock();
                        for( int i=rl; i<ru; i++ )
-                               for( int j=0; j<n; j++ ){
-                                       double val1 = in1.quickGetValue(i, j);
-                                       double val2 = in2.quickGetValue(i, j);
-                                       double val = val1 * val2;
-                                       if( in3 != null )
-                                               val *= in3.quickGetValue(i, j);
-                                       kplus.execute2( kbuff, val );           
-                               }
+                               if( !a.isEmpty(i) ) {
+                                       int apos = a.pos(i);
+                                       int alen = a.size(i);
+                                       int[] aix = a.indexes(i);
+                                       double[] avals = a.values(i);
+                                       for( int j=apos; j<apos+alen; j++ ) {
+                                               int colIx = aix[j];
+                                               double val1 = avals[j];
+                                               double val2 = 
lin2.quickGetValue(i, colIx);
+                                               double val = val1 * val2;
+                                               if( val != 0 && lin3 != null )
+                                                       val *= 
lin3.quickGetValue(i, colIx);
+                                               kbuff._sum = c[colIx];
+                                               kbuff._correction = c[colIx+n];
+                                               kplus.execute2( kbuff, val );   
+                                               c[colIx] = kbuff._sum;
+                                               c[colIx+n] = kbuff._correction; 
+                                       }
+                               }       
                }
-       
-               return kbuff._sum;
        }
        
 
@@ -3492,37 +3539,43 @@ public class LibMatrixAgg
                }               
        }
 
-       private static class AggTernaryTask extends AggTask 
+       private static class AggTernaryTask implements Callable<MatrixBlock>
        {
-               private MatrixBlock _in1  = null;
-               private MatrixBlock _in2  = null;
-               private MatrixBlock _in3  = null;
-               private double _ret = -1;
-               private int _rl = -1;
-               private int _ru = -1;
+               private final MatrixBlock _in1;
+               private final MatrixBlock _in2;
+               private final MatrixBlock _in3;
+               private MatrixBlock _ret = null;
+               private final IndexFunction _ixFn;
+               private final int _rl;
+               private final int _ru;
 
-               protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, 
MatrixBlock in3, int rl, int ru ) 
+               protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, 
MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru ) 
                        throws DMLRuntimeException
                {
                        _in1 = in1;     
                        _in2 = in2;     
-                       _in3 = in3;                             
+                       _in3 = in3;             
+                       _ret = ret;
+                       _ixFn = ixFn;
                        _rl = rl;
                        _ru = ru;
                }
                
                @Override
-               public Object call() throws DMLRuntimeException
+               public MatrixBlock call() throws DMLRuntimeException
                {
+                       //thead-local allocation for partial aggregation
+                       _ret = new MatrixBlock(_ret.rlen, _ret.clen, false);
+                       _ret.allocateDenseBlock();
+                       
                        if( !_in1.sparse && !_in2.sparse && 
(_in3==null||!_in3.sparse) ) //DENSE
-                               _ret = aggregateTernaryDense(_in1, _in2, _in3, 
_rl, _ru);
+                               aggregateTernaryDense(_in1, _in2, _in3, _ret, 
_ixFn, _rl, _ru);
                        else //GENERAL CASE
-                               _ret = aggregateTernaryGeneric(_in1, _in2, 
_in3, _rl, _ru);
+                               aggregateTernaryGeneric(_in1, _in2, _in3, _ret, 
_ixFn, _rl, _ru);
+                       
+                       //recompute non-zeros of partial result
+                       _ret.recomputeNonZeros();
                        
-                       return null;
-               }
-               
-               public double getResult() {
                        return _ret;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index fb4f196..5fc69ce 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -59,7 +59,6 @@ import org.apache.sysml.runtime.functionobjects.RevIndex;
 import org.apache.sysml.runtime.functionobjects.SortIndex;
 import org.apache.sysml.runtime.functionobjects.SwapIndex;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
-import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -68,6 +67,7 @@ import 
org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
@@ -4886,7 +4886,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                return ret;
        }
 
-       public ScalarObject aggregateTernaryOperations(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock m3, AggregateBinaryOperator op) 
+       public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, 
MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, 
boolean inCP) 
                throws DMLRuntimeException
        {
                //check input dimensions and operators
@@ -4895,15 +4895,23 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( !( op.aggOp.increOp.fn instanceof KahanPlus && op.binaryFn 
instanceof Multiply) )
                        throw new DMLRuntimeException("Unsupported operator for 
aggregate tertiary operations.");
                
+               //create output matrix block w/ corrections
+               int rl = (op.indexFn instanceof ReduceRow) ? 2 : 1;
+               int cl = (op.indexFn instanceof ReduceRow) ? m1.clen : 2;
+               if( ret == null )
+                       ret = new MatrixBlock(rl, cl, false);
+               else
+                       ret.reset(rl, cl, false);
+                               
                //execute ternary aggregate function
-               double val = -1;
                if( op.getNumThreads() > 1 )
-                       val = LibMatrixAgg.aggregateTernary(m1, m2, m3, 
op.getNumThreads());
+                       ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, 
op, op.getNumThreads());
                else
-                       val = LibMatrixAgg.aggregateTernary(m1, m2, m3);
+                       ret = LibMatrixAgg.aggregateTernary(m1, m2, m3, ret, 
op);
                
-               //create output
-               return new DoubleObject(val);
+               if(op.aggOp.correctionExists && inCP)
+                       ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+               return ret;
        }
 
        public MatrixBlock  uaggouterchainOperations(MatrixBlock mbLeft, 
MatrixBlock mbRight, MatrixBlock mbOut, BinaryOperator bOp, 
AggregateUnaryOperator uaggOp) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
new file mode 100644
index 0000000..7ffdaff
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/operators/AggregateTernaryOperator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysml.runtime.matrix.operators;
+
+import java.io.Serializable;
+
+import org.apache.sysml.runtime.functionobjects.IndexFunction;
+import org.apache.sysml.runtime.functionobjects.ValueFunction;
+
+
+public class AggregateTernaryOperator extends Operator implements Serializable
+{
+       private static final long serialVersionUID = 4251745081160216784L;
+       
+       public ValueFunction binaryFn;
+       public AggregateOperator aggOp;
+       public IndexFunction indexFn;
+       private int k; //num threads
+       
+       public AggregateTernaryOperator(ValueFunction inner, AggregateOperator 
outer, IndexFunction ixfun) {
+               //default degree of parallelism is 1 (e.g., for distributed 
operations)
+               this( inner, outer, ixfun, 1 );
+       }
+       
+       public AggregateTernaryOperator(ValueFunction inner, AggregateOperator 
outer, IndexFunction ixfun, int numThreads)
+       {
+               binaryFn = inner;
+               aggOp = outer;
+               indexFn = ixfun;
+               k = numThreads;
+               
+               //so far we only support sum-product and its sparse-safe
+               sparseSafe = true;
+       }
+       
+       public int getNumThreads() {
+               return k;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
new file mode 100644
index 0000000..6168025
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/ternary/TernaryAggregateTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.ternary;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.instructions.Instruction;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ * 
+ */
+public class TernaryAggregateTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME1 = "TernaryAggregateRC";
+       private final static String TEST_NAME2 = "TernaryAggregateC";
+       
+       private final static String TEST_DIR = "functions/ternary/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
TernaryAggregateTest.class.getSimpleName() + "/";
+       private final static double eps = 1e-8;
+       
+       private final static int rows = 1111;
+       private final static int cols = 1011;
+       
+       private final static double sparsity1 = 0.7;
+       private final static double sparsity2 = 0.3;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); 
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) ); 
+       }
+
+       @Test
+       public void testTernaryAggregateRCDenseVectorCP() {
+               runTernaryAggregateTest(TEST_NAME1, false, true, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseVectorCP() {
+               runTernaryAggregateTest(TEST_NAME1, true, true, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseMatrixCP() {
+               runTernaryAggregateTest(TEST_NAME1, false, false, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseMatrixCP() {
+               runTernaryAggregateTest(TEST_NAME1, true, false, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseVectorSP() {
+               runTernaryAggregateTest(TEST_NAME1, false, true, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseVectorSP() {
+               runTernaryAggregateTest(TEST_NAME1, true, true, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseMatrixSP() {
+               runTernaryAggregateTest(TEST_NAME1, false, false, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseMatrixSP() {
+               runTernaryAggregateTest(TEST_NAME1, true, false, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseVectorMR() {
+               runTernaryAggregateTest(TEST_NAME1, false, true, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseVectorMR() {
+               runTernaryAggregateTest(TEST_NAME1, true, true, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseMatrixMR() {
+               runTernaryAggregateTest(TEST_NAME1, false, false, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseMatrixMR() {
+               runTernaryAggregateTest(TEST_NAME1, true, false, true, 
ExecType.MR);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseVectorCP() {
+               runTernaryAggregateTest(TEST_NAME2, false, true, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseVectorCP() {
+               runTernaryAggregateTest(TEST_NAME2, true, true, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseMatrixCP() {
+               runTernaryAggregateTest(TEST_NAME2, false, false, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseMatrixCP() {
+               runTernaryAggregateTest(TEST_NAME2, true, false, true, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseVectorSP() {
+               runTernaryAggregateTest(TEST_NAME2, false, true, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseVectorSP() {
+               runTernaryAggregateTest(TEST_NAME2, true, true, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseMatrixSP() {
+               runTernaryAggregateTest(TEST_NAME2, false, false, true, 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseMatrixSP() {
+               runTernaryAggregateTest(TEST_NAME2, true, false, true, 
ExecType.SPARK);
+       }
+       
+       //additional tests to check default without rewrites
+       
+       @Test
+       public void testTernaryAggregateRCDenseVectorCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, false, true, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseVectorCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, true, true, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCDenseMatrixCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, false, false, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateRCSparseMatrixCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, true, false, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseVectorCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, false, true, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseVectorCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, true, true, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCDenseMatrixCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, false, false, false, 
ExecType.CP);
+       }
+       
+       @Test
+       public void testTernaryAggregateCSparseMatrixCPNoRewrite() {
+               runTernaryAggregateTest(TEST_NAME2, true, false, false, 
ExecType.CP);
+       }
+       
+       
+       
+       private void runTernaryAggregateTest(String testname, boolean sparse, 
boolean vectors, boolean rewrites, ExecType et)
+       {
+               //rtplatform for MR
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+               }
+       
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+       
+               boolean rewritesOld = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
+               
+               try
+               {
+                       TestConfiguration config = 
getTestConfiguration(testname);
+                       loadTestConfiguration(config);
+                       
+                       OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewrites;
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + testname + ".dml";
+                       programArgs = new String[]{"-stats","-args", 
input("A"), output("R")};
+                       
+                       fullRScriptName = HOME + testname + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
+                               inputDir() + " " + expectedDir();
+       
+                       //generate actual dataset
+                       double sparsity = sparse ? sparsity2 : sparsity1;
+                       double[][] A = getRandomMatrix(vectors ? rows*cols : 
rows, 
+                                       vectors ? 1 : cols, 0, 1, sparsity, 
17); 
+                       writeInputMatrixWithMTD("A", A, true);
+                       
+                       //run test cases
+                       runTest(true, false, null, -1); 
+                       runRScript(true); 
+                       
+                       //compare output matrices 
+                       HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
+                       HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");
+                       TestUtils.compareMatrices(dmlfile, rfile, eps, 
"Stat-DML", "Stat-R");
+                       
+                       //check for rewritten patterns in statistics output
+                       if( rewrites && et != ExecType.MR ) {
+                               String opcode = ((et == ExecType.SPARK) ? 
Instruction.SP_INST_PREFIX : "") + 
+                                       (((testname.equals(TEST_NAME1) || 
vectors ) ? "tak+*" : "tack+*"));
+                               Assert.assertEquals(new Boolean(true), new 
Boolean(
+                                       
Statistics.getCPHeavyHitterOpCodes().contains(opcode)));
+                       }
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES = rewritesOld;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.R 
b/src/test/scripts/functions/ternary/TernaryAggregateC.R
new file mode 100644
index 0000000..8e83ebf
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateC.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+B = A * 2;
+C = A * 3;
+
+R = t(as.matrix(colSums(A * B * C)));
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateC.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateC.dml 
b/src/test/scripts/functions/ternary/TernaryAggregateC.dml
new file mode 100644
index 0000000..a171ff4
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateC.dml
@@ -0,0 +1,30 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1);
+B = A * 2;
+C = A * 3;
+
+if(1==1){}
+
+R = colSums(A * B * C);
+
+write(R, $2);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.R 
b/src/test/scripts/functions/ternary/TernaryAggregateRC.R
new file mode 100644
index 0000000..96e793e
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.R
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+B = A * 2;
+C = A * 3;
+
+s = sum(A * B * C);
+R = as.matrix(s);
+
+writeMM(as(R, "CsparseMatrix"), paste(args[2], "R", sep="")); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/ternary/TernaryAggregateRC.dml 
b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
new file mode 100644
index 0000000..485570d
--- /dev/null
+++ b/src/test/scripts/functions/ternary/TernaryAggregateRC.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1);
+B = A * 2;
+C = A * 3;
+
+if(1==1){}
+
+s = sum(A * B * C);
+R = as.matrix(s);
+
+write(R, $2);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/21b96855/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
index 83f217d..784177d 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/ternary/ZPackageSuite.java
@@ -31,7 +31,8 @@ import org.junit.runners.Suite;
        CTableMatrixIgnoreZerosTest.class,
        CTableSequenceTest.class,
        QuantileWeightsTest.class,
-       TableOutputTest.class
+       TableOutputTest.class,
+       TernaryAggregateTest.class,
 })
 
 

Reply via email to