This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 0118a3e [MINOR] Fix multi-threaded federated MV multiply, and test
issues
0118a3e is described below
commit 0118a3eef317826cf79bf01471f07a67631cee64
Author: Matthias Boehm <[email protected]>
AuthorDate: Sun May 31 22:41:59 2020 +0200
[MINOR] Fix multi-threaded federated MV multiply, and test issues
So far, the federated matrix-vector multiplications were always executed
in a single-threaded manner, now we execute them according to the local
parallelism configuration at the federated worker.
Also, it seems I introduced a bug of privacy handling during the merge,
which this patch also fixes (e.g., on scalar casts of non-cacheable data
objects).
---
.../federated/FederatedWorkerHandler.java | 8 +++-----
.../cp/AggregateBinaryCPInstruction.java | 23 ++++++----------------
.../instructions/cp/VariableCPInstruction.java | 3 ---
.../gpu/AggregateBinaryGPUInstruction.java | 4 +---
.../instructions/spark/CpmmSPInstruction.java | 12 +++--------
.../instructions/spark/MapmmSPInstruction.java | 21 ++++++--------------
.../instructions/spark/PMapmmSPInstruction.java | 17 ++++------------
.../instructions/spark/PmmSPInstruction.java | 6 +-----
.../instructions/spark/ZipmmSPInstruction.java | 3 +--
.../sysds/runtime/privacy/PrivacyMonitor.java | 2 +-
.../compress/ParCompressedMatrixTest.java | 10 +++-------
11 files changed, 29 insertions(+), 80 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index bba731c..6fe814a 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -38,15 +38,13 @@ import
org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
@@ -187,8 +185,8 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
matTo = PrivacyMonitor.handlePrivacy(matTo);
MatrixBlock matBlock1 = matTo.acquireReadAndRelease();
// TODO other datatypes
- AggregateBinaryOperator ab_op = new AggregateBinaryOperator(
- Multiply.getMultiplyFnObject(), new
AggregateOperator(0, Plus.getPlusFnObject()));
+ AggregateBinaryOperator ab_op = InstructionUtils
+
.getMatMultOperator(OptimizerUtils.getConstrainedNumThreads(0));
MatrixBlock result = isMatVecMult ?
matBlock1.aggregateBinaryOperations(matBlock1, vector,
new MatrixBlock(), ab_op) :
vector.aggregateBinaryOperations(vector, matBlock1, new
MatrixBlock(), ab_op);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 1e3186d..0df8108 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -19,17 +19,12 @@
package org.apache.sysds.runtime.instructions.cp;
-import org.apache.sysds.common.Types.DataType;
-import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
public class AggregateBinaryCPInstruction extends BinaryCPInstruction {
@@ -39,10 +34,6 @@ public class AggregateBinaryCPInstruction extends
BinaryCPInstruction {
}
public static AggregateBinaryCPInstruction parseInstruction( String str
) {
- CPOperand in1 = new CPOperand("", ValueType.UNKNOWN,
DataType.UNKNOWN);
- CPOperand in2 = new CPOperand("", ValueType.UNKNOWN,
DataType.UNKNOWN);
- CPOperand out = new CPOperand("", ValueType.UNKNOWN,
DataType.UNKNOWN);
-
String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
String opcode = parts[0];
@@ -50,15 +41,13 @@ public class AggregateBinaryCPInstruction extends
BinaryCPInstruction {
throw new
DMLRuntimeException("AggregateBinaryInstruction.parseInstruction():: Unknown
opcode " + opcode);
}
- InstructionUtils.checkNumFields( parts, 4 );
- in1.split(parts[1]);
- in2.split(parts[2]);
- out.split(parts[3]);
+ InstructionUtils.checkNumFields(parts, 4);
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand out = new CPOperand(parts[3]);
int k = Integer.parseInt(parts[4]);
-
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, k);
- return new AggregateBinaryCPInstruction(aggbin, in1, in2, out,
opcode, str);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(k);
+ return new AggregateBinaryCPInstruction(aggbin, in1, in2, out,
opcode, str);
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index f40abb0..15e9ccb 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -58,9 +58,6 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.meta.TensorCharacteristics;
-import org.apache.sysds.runtime.privacy.DMLPrivacyException;
-import org.apache.sysds.runtime.privacy.PrivacyConstraint;
-import org.apache.sysds.runtime.privacy.PrivacyConstraint.PrivacyLevel;
import org.apache.sysds.runtime.privacy.PrivacyMonitor;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
index eb6ce30..78b05a0 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/AggregateBinaryGPUInstruction.java
@@ -30,7 +30,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixCUDA;
import org.apache.sysds.runtime.matrix.data.LibMatrixCuMatMult;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
import org.apache.sysds.utils.GPUStatistics;
@@ -64,8 +63,7 @@ public class AggregateBinaryGPUInstruction extends
GPUInstruction {
CPOperand out = new CPOperand(parts[3]);
boolean isLeftTransposed = Boolean.parseBoolean(parts[4]);
boolean isRightTransposed = Boolean.parseBoolean(parts[5]);
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg, 1);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
return new AggregateBinaryGPUInstruction(aggbin, in1, in2, out,
opcode, str, isLeftTransposed, isRightTransposed);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
index 0592a49..ab98af3 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
@@ -27,8 +27,6 @@ import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.functionobjects.SwapIndex;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
@@ -42,7 +40,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -75,8 +72,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
CPOperand in1 = new CPOperand(parts[1]);
CPOperand in2 = new CPOperand(parts[2]);
CPOperand out = new CPOperand(parts[3]);
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
boolean outputEmptyBlocks = Boolean.parseBoolean(parts[4]);
SparkAggType aggtype = SparkAggType.valueOf(parts[5]);
return new CpmmSPInstruction(aggbin, in1, in2, out,
outputEmptyBlocks, aggtype, opcode, str);
@@ -195,8 +191,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
throws Exception
{
if( _op == null ) { //lazy operator construction
- AggregateOperator agg = new
AggregateOperator(0, Plus.getPlusFnObject());
- _op = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ _op = InstructionUtils.getMatMultOperator(1);
}
MatrixBlock blkIn1 =
(MatrixBlock)arg0._2()._1().getValue();
@@ -224,8 +219,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
throws Exception {
//lazy operator construction
if( _op == null ) {
- AggregateOperator agg = new
AggregateOperator(0, Plus.getPlusFnObject());
- _op = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ _op = InstructionUtils.getMatMultOperator(1);
_rop = new
ReorgOperator(SwapIndex.getSwapIndexFnObject());
}
//prepare inputs, including transpose of right-hand-side
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
index 0d8ca26..4dce9c1 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
@@ -80,9 +80,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
boolean outputEmpty = Boolean.parseBoolean(parts[5]);
SparkAggType aggtype = SparkAggType.valueOf(parts[6]);
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
- return new MapmmSPInstruction(aggbin, in1, in2, out, type,
outputEmpty, aggtype, opcode, str);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
+ return new MapmmSPInstruction(aggbin, in1, in2, out, type,
outputEmpty, aggtype, opcode, str);
}
@Override
@@ -245,14 +244,10 @@ public class MapmmSPInstruction extends
BinarySPInstruction {
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
- public RDDMapMMFunction( CacheType type,
PartitionedBroadcast<MatrixBlock> binput )
- {
+ public RDDMapMMFunction( CacheType type,
PartitionedBroadcast<MatrixBlock> binput ) {
_type = type;
_pbc = binput;
-
- //created operator for reuse
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- _op = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ _op = InstructionUtils.getMatMultOperator(1);
}
@Override
@@ -412,14 +407,10 @@ public class MapmmSPInstruction extends
BinarySPInstruction {
private final AggregateBinaryOperator _op;
private final PartitionedBroadcast<MatrixBlock> _pbc;
- public RDDFlatMapMMFunction( CacheType type,
PartitionedBroadcast<MatrixBlock> binput )
- {
+ public RDDFlatMapMMFunction( CacheType type,
PartitionedBroadcast<MatrixBlock> binput ) {
_type = type;
_pbc = binput;
-
- //created operator for reuse
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- _op = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ _op = InstructionUtils.getMatMultOperator(1);
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
index e3fe68e..eed76b7 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -29,8 +29,6 @@ import org.apache.sysds.lops.PMapMult;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
@@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import scala.Tuple2;
@@ -68,14 +65,12 @@ public class PMapmmSPInstruction extends
BinarySPInstruction {
CPOperand in1 = new CPOperand(parts[1]);
CPOperand in2 = new CPOperand(parts[2]);
CPOperand out = new CPOperand(parts[3]);
-
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
return new PMapmmSPInstruction(aggbin, in1, in2, out,
opcode, str);
}
else {
throw new
DMLRuntimeException("PMapmmSPInstruction.parseInstruction():: Unknown opcode "
+ opcode);
- }
+ }
}
@Override
@@ -162,14 +157,10 @@ public class PMapmmSPInstruction extends
BinarySPInstruction {
private Broadcast<PartitionedBlock<MatrixBlock>> _pbc = null;
private long _offset = -1;
- public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>>
binput, long offset )
- {
+ public PMapMMFunction( Broadcast<PartitionedBlock<MatrixBlock>>
binput, long offset ) {
_pbc = binput;
_offset = offset;
-
- //created operator for reuse
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- _op = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ _op = InstructionUtils.getMatMultOperator(1);
}
@Override
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
index b87c904..cbaf347 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/PmmSPInstruction.java
@@ -28,8 +28,6 @@ import org.apache.sysds.lops.PMMJ;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -37,7 +35,6 @@ import
org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.UtilFunctions;
@@ -66,8 +63,7 @@ public class PmmSPInstruction extends BinarySPInstruction {
CPOperand nrow = new CPOperand(parts[3]);
CPOperand out = new CPOperand(parts[4]);
CacheType type = CacheType.valueOf(parts[5]);
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
return new PmmSPInstruction(aggbin, in1, in2, out,
nrow, type, opcode, str);
}
else {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
index 7bf9e8b..e76c5a4 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ZipmmSPInstruction.java
@@ -59,8 +59,7 @@ public class ZipmmSPInstruction extends BinarySPInstruction {
CPOperand in2 = new CPOperand(parts[2]);
CPOperand out = new CPOperand(parts[3]);
boolean tRewrite = Boolean.parseBoolean(parts[4]);
- AggregateOperator agg = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator aggbin = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), agg);
+ AggregateBinaryOperator aggbin =
InstructionUtils.getMatMultOperator(1);
return new ZipmmSPInstruction(aggbin, in1, in2, out,
tRewrite, opcode, str);
}
diff --git a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
index 118a153..ee88bf4 100644
--- a/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
+++ b/src/main/java/org/apache/sysds/runtime/privacy/PrivacyMonitor.java
@@ -85,7 +85,7 @@ public class PrivacyMonitor
* @param input variable for which the privacy constraint is checked
*/
public static void handlePrivacyScalarOutput(CPOperand input,
ExecutionContext ec) {
- Data data = ec.getCacheableData(input);
+ Data data = ec.getVariable(input);
if ( data != null && (data instanceof CacheableData<?>)){
PrivacyConstraint privacyConstraintIn =
((CacheableData<?>) data).getPrivacyConstraint();
if ( privacyConstraintIn != null &&
(privacyConstraintIn.getPrivacyLevel() == PrivacyLevel.Private) ){
diff --git
a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
index 80b82bb..e86c269 100644
---
a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
+++
b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
@@ -24,11 +24,9 @@ import org.apache.sysds.lops.MapMultChain.ChainType;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressionSettings;
import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysds.runtime.functionobjects.Multiply;
-import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.test.TestUtils;
@@ -160,8 +158,7 @@ public class ParCompressedMatrixTest extends
AbstractCompressedUnaryTests {
.convertToMatrixBlock(TestUtils.generateTestMatrix(cols, 1, 1, 1, 1.0, 3));
// matrix-vector uncompressed
- AggregateOperator aop = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator abop = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), aop, k);
+ AggregateBinaryOperator abop =
InstructionUtils.getMatMultOperator(k);
MatrixBlock ret1 = mb.aggregateBinaryOperations(mb,
vector, new MatrixBlock(), abop);
// matrix-vector compressed
@@ -188,8 +185,7 @@ public class ParCompressedMatrixTest extends
AbstractCompressedUnaryTests {
.convertToMatrixBlock(TestUtils.generateTestMatrix(1, rows, 1, 1, 1.0, 3));
// Make Operator
- AggregateOperator aop = new AggregateOperator(0,
Plus.getPlusFnObject());
- AggregateBinaryOperator abop = new
AggregateBinaryOperator(Multiply.getMultiplyFnObject(), aop, k);
+ AggregateBinaryOperator abop =
InstructionUtils.getMatMultOperator(k);
// vector-matrix uncompressed
MatrixBlock ret1 = mb.aggregateBinaryOperations(vector,
mb, new MatrixBlock(), abop);