This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 0118a3e  [MINOR] Fix multi-threaded federated MV multiply, and test 
issues
0118a3e is described below

commit 0118a3eef317826cf79bf01471f07a67631cee64
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun May 31 22:41:59 2020 +0200

    [MINOR] Fix multi-threaded federated MV multiply, and test issues
    
    So far, the federated matrix-vector multiplications were always executed
    in a single-threaded manner, now we execute them according to the local
    parallelism configuration at the federated worker.
    
    Also, it seems I introduced a bug of privacy handling during the merge,
    which this patch also fixes (e.g., on scalar casts of non-cacheable data
    objects).
---
 .../federated/FederatedWorkerHandler.java          |  8 +++-----
 .../cp/AggregateBinaryCPInstruction.java           | 23 ++++++----------------
 .../instructions/cp/VariableCPInstruction.java     |  3 ---
 .../gpu/AggregateBinaryGPUInstruction.java         |  4 +---
 .../instructions/spark/CpmmSPInstruction.java      | 12 +++--------
 .../instructions/spark/MapmmSPInstruction.java     | 21 ++++++--------------
 .../instructions/spark/PMapmmSPInstruction.java    | 17 ++++------------
 .../instructions/spark/PmmSPInstruction.java       |  6 +-----
 .../instructions/spark/ZipmmSPInstruction.java     |  3 +--
 .../sysds/runtime/privacy/PrivacyMonitor.java      |  2 +-
 .../compress/ParCompressedMatrixTest.java          | 10 +++-------
 11 files changed, 29 insertions(+), 80 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index bba731c..6fe814a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -38,15 +38,13 @@ import 
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.ListObject;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -187,8 +185,8 @@ public class FederatedWorkerHandler extends 
ChannelInboundHandlerAdapter {
                matTo = PrivacyMonitor.handlePrivacy(matTo);
                MatrixBlock matBlock1 = matTo.acquireReadAndRelease();
                // TODO other datatypes
-               AggregateBinaryOperator ab_op = new AggregateBinaryOperator(
-                       Multiply.getMultiplyFnObject(), new 
AggregateOperator(0, Plus.getPlusFnObject()));
+               AggregateBinaryOperator ab_op = InstructionUtils
+                       
.getMatMultOperator(OptimizerUtils.getConstrainedNumThreads(0));
                MatrixBlock result = isMatVecMult ?
                        matBlock1.aggregateBinaryOperations(matBlock1, vector, 
new MatrixBlock(), ab_op) :
                        vector.aggregateBinaryOperations(vector, matBlock1, new 
MatrixBlock(), ab_op);
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 1e3186d..0df8108 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -19,17 +19,12 @@
 
 package org.apache.sysds.runtime.instructions.cp;
 
-import org.apache.sysds.common.Types.DataType;
-import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 
 public class AggregateBinaryCPInstruction extends BinaryCPInstruction {
@@ -39,10 +34,6 @@ public class AggregateBinaryCPInstruction extends 
BinaryCPInstruction {
        }
 
        public static AggregateBinaryCPInstruction parseInstruction( String str 
) {
-               CPOperand in1 = new CPOperand("", ValueType.UNKNOWN, 
DataType.UNKNOWN);
-               CPOperand in2 = new CPOperand("", ValueType.UNKNOWN, 
DataType.UNKNOWN);
-               CPOperand out = new CPOperand("", ValueType.UNKNOWN, 
DataType.UNKNOWN);
-
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                String opcode = parts[0];
 
@@ -50,15 +41,13 @@ public class AggregateBinaryCPInstruction extends 
BinaryCPInstruction {
                        throw new 
DMLRuntimeException("AggregateBinaryInstruction.parseInstruction():: Unknown 
opcode " + opcode);
                }
                
-               InstructionUtils.checkNumFields( parts, 4 );
-               in1.split(parts[1]);
-               in2.split(parts[2]);
-               out.split(parts[3]);
+               InstructionUtils.checkNumFields(parts, 4);
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand out = new CPOperand(parts[3]);
                int k = Integer.parseInt(parts[4]);
-               
-               AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-               AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, k);
-               return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, 
opcode, str);    
+               AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(k);
+               return new AggregateBinaryCPInstruction(aggbin, in1, in2, out, 
opcode, str);
        }
        
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index f40abb0..15e9ccb 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -58,9 +58,6 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaData;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.runtime.meta.TensorCharacteristics;
-import org.apache.sysds.runtime.privacy.DMLPrivacyException;
-import org.apache.sysds.runtime.privacy.PrivacyConstraint;
-import org.apache.sysds.runtime.privacy.PrivacyConstraint.PrivacyLevel;
 import org.apache.sysds.runtime.privacy.PrivacyMonitor;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.HDFSTool;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
index eb6ce30..78b05a0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
@@ -30,7 +30,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixCUDA;
 import org.apache.sysds.runtime.matrix.data.LibMatrixCuMatMult;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.utils.GPUStatistics;
@@ -64,8 +63,7 @@ public class AggregateBinaryGPUInstruction extends 
GPUInstruction {
                CPOperand out = new CPOperand(parts[3]);
                boolean isLeftTransposed = Boolean.parseBoolean(parts[4]);
                boolean isRightTransposed = Boolean.parseBoolean(parts[5]);
-               AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-               AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, 1);
+               AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
                return new AggregateBinaryGPUInstruction(aggbin, in1, in2, out, 
opcode, str, isLeftTransposed, isRightTransposed);      
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
index 0592a49..ab98af3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
@@ -27,8 +27,6 @@ import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
@@ -42,7 +40,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -75,8 +72,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                CPOperand in1 = new CPOperand(parts[1]);
                CPOperand in2 = new CPOperand(parts[2]);
                CPOperand out = new CPOperand(parts[3]);
-               AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-               AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+               AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
                boolean outputEmptyBlocks = Boolean.parseBoolean(parts[4]);
                SparkAggType aggtype = SparkAggType.valueOf(parts[5]);
                return new CpmmSPInstruction(aggbin, in1, in2, out, 
outputEmptyBlocks, aggtype, opcode, str);
@@ -195,8 +191,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                        throws Exception
                {
                        if( _op == null ) { //lazy operator construction
-                               AggregateOperator agg = new 
AggregateOperator(0, Plus.getPlusFnObject());
-                               _op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                               _op = InstructionUtils.getMatMultOperator(1);
                        }
                        
                        MatrixBlock blkIn1 = 
(MatrixBlock)arg0._2()._1().getValue();
@@ -224,8 +219,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0) 
throws Exception {
                         //lazy operator construction
                        if( _op == null ) {
-                               AggregateOperator agg = new 
AggregateOperator(0, Plus.getPlusFnObject());
-                               _op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                               _op = InstructionUtils.getMatMultOperator(1);
                                _rop = new 
ReorgOperator(SwapIndex.getSwapIndexFnObject());
                        }
                        //prepare inputs, including transpose of right-hand-side
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
index 0d8ca26..4dce9c1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
@@ -80,9 +80,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
                boolean outputEmpty = Boolean.parseBoolean(parts[5]);
                SparkAggType aggtype = SparkAggType.valueOf(parts[6]);
                
-               AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-               AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
-               return new MapmmSPInstruction(aggbin, in1, in2, out, type, 
outputEmpty, aggtype, opcode, str);          
+               AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
+               return new MapmmSPInstruction(aggbin, in1, in2, out, type, 
outputEmpty, aggtype, opcode, str);
        }
        
        @Override
@@ -245,14 +244,10 @@ public class MapmmSPInstruction extends 
BinarySPInstruction {
                private final AggregateBinaryOperator _op;
                private final PartitionedBroadcast<MatrixBlock> _pbc;
                
-               public RDDMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput )
-               {
+               public RDDMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput ) {
                        _type = type;
                        _pbc = binput;
-                       
-                       //created operator for reuse
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       _op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       _op = InstructionUtils.getMatMultOperator(1);
                }
                
                @Override
@@ -412,14 +407,10 @@ public class MapmmSPInstruction extends 
BinarySPInstruction {
                private final AggregateBinaryOperator _op;
                private final PartitionedBroadcast<MatrixBlock> _pbc;
                
-               public RDDFlatMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput )
-               {
+               public RDDFlatMapMMFunction( CacheType type, 
PartitionedBroadcast<MatrixBlock> binput ) {
                        _type = type;
                        _pbc = binput;
-                       
-                       //created operator for reuse
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       _op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       _op = InstructionUtils.getMatMultOperator(1);
                }
                
                @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
index e3fe68e..eed76b7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -29,8 +29,6 @@ import org.apache.sysds.lops.PMapMult;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
@@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import scala.Tuple2;
@@ -68,14 +65,12 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction {
                        CPOperand in1 = new CPOperand(parts[1]);
                        CPOperand in2 = new CPOperand(parts[2]);
                        CPOperand out = new CPOperand(parts[3]);
-                       
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
                        return new PMapmmSPInstruction(aggbin, in1, in2, out, 
opcode, str);
                } 
                else {
                        throw new 
DMLRuntimeException("PMapmmSPInstruction.parseInstruction():: Unknown opcode " 
+ opcode);
-               }               
+               }
        }
        
        @Override
@@ -162,14 +157,10 @@ public class PMapmmSPInstruction extends 
BinarySPInstruction {
                private Broadcast<PartitionedBlock<MatrixBlock>> _pbc = null;
                private long _offset = -1;
                
-               public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> 
binput, long offset )
-               {
+               public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>> 
binput, long offset ) {
                        _pbc = binput;
                        _offset = offset;
-                       
-                       //created operator for reuse
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       _op = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       _op = InstructionUtils.getMatMultOperator(1);
                }
 
                @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
index b87c904..cbaf347 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
@@ -28,8 +28,6 @@ import org.apache.sysds.lops.PMMJ;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -37,7 +35,6 @@ import 
org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.util.UtilFunctions;
@@ -66,8 +63,7 @@ public class PmmSPInstruction extends BinarySPInstruction {
                        CPOperand nrow = new CPOperand(parts[3]);
                        CPOperand out = new CPOperand(parts[4]);
                        CacheType type = CacheType.valueOf(parts[5]);
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
                        return new PmmSPInstruction(aggbin, in1, in2, out, 
nrow, type, opcode, str);
                } 
                else {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
index 7bf9e8b..e76c5a4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
@@ -59,8 +59,7 @@ public class ZipmmSPInstruction extends BinarySPInstruction {
                        CPOperand in2 = new CPOperand(parts[2]);
                        CPOperand out = new CPOperand(parts[3]);
                        boolean tRewrite = Boolean.parseBoolean(parts[4]);
-                       AggregateOperator agg = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       AggregateBinaryOperator aggbin = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+                       AggregateBinaryOperator aggbin = 
InstructionUtils.getMatMultOperator(1);
                        
                        return new ZipmmSPInstruction(aggbin, in1, in2, out, 
tRewrite, opcode, str);
                } 
diff --git a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java 
b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
index 118a153..ee88bf4 100644
--- a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
+++ b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
@@ -85,7 +85,7 @@ public class PrivacyMonitor
         * @param input variable for which the privacy constraint is checked
         */
        public static void handlePrivacyScalarOutput(CPOperand input, 
ExecutionContext ec) {
-               Data data = ec.getCacheableData(input);
+               Data data = ec.getVariable(input);
                if ( data != null && (data instanceof CacheableData<?>)){
                        PrivacyConstraint privacyConstraintIn = 
((CacheableData<?>) data).getPrivacyConstraint();
                        if ( privacyConstraintIn != null && 
(privacyConstraintIn.getPrivacyLevel() == PrivacyLevel.Private) ){
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
index 80b82bb..e86c269 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
@@ -24,11 +24,9 @@ import org.apache.sysds.lops.MapMultChain.ChainType;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.test.TestUtils;
@@ -160,8 +158,7 @@ public class ParCompressedMatrixTest extends 
AbstractCompressedUnaryTests {
                                
.convertToMatrixBlock(TestUtils.generateTestMatrix(cols, 1, 1, 1, 1.0, 3));
 
                        // matrix-vector uncompressed
-                       AggregateOperator aop = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       AggregateBinaryOperator abop = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), aop, k);
+                       AggregateBinaryOperator abop = 
InstructionUtils.getMatMultOperator(k);
                        MatrixBlock ret1 = mb.aggregateBinaryOperations(mb, 
vector, new MatrixBlock(), abop);
 
                        // matrix-vector compressed
@@ -188,8 +185,7 @@ public class ParCompressedMatrixTest extends 
AbstractCompressedUnaryTests {
                                
.convertToMatrixBlock(TestUtils.generateTestMatrix(1, rows, 1, 1, 1.0, 3));
 
                        // Make Operator
-                       AggregateOperator aop = new AggregateOperator(0, 
Plus.getPlusFnObject());
-                       AggregateBinaryOperator abop = new 
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), aop, k);
+                       AggregateBinaryOperator abop = 
InstructionUtils.getMatMultOperator(k);
 
                        // vector-matrix uncompressed
                        MatrixBlock ret1 = mb.aggregateBinaryOperations(vector, 
mb, new MatrixBlock(), abop);

Reply via email to