This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 42143ec  [SYSTEMDS-2921] Federated cumulative aggregates (e.g., cumsum)
42143ec is described below

commit 42143ec14618332cce9ae44129829d3bfff86762
Author: Olga <[email protected]>
AuthorDate: Mon Apr 5 16:24:17 2021 +0200

    [SYSTEMDS-2921] Federated cumulative aggregates (e.g., cumsum)
    
    Closes #1216.
---
 .../controlprogram/federated/FederationUtils.java  |  36 ++--
 .../runtime/instructions/InstructionUtils.java     |  16 +-
 .../runtime/instructions/fed/FEDInstruction.java   |   1 +
 .../instructions/fed/FEDInstructionUtils.java      |   3 +-
 .../fed/UnaryMatrixFEDInstruction.java             | 218 +++++++++++++++++++--
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   4 +-
 .../primitives/FederatedFullCumulativeTest.java    | 211 ++++++++++++++++++++
 .../federated/cumulative/FederatedCummaxTest.dml   |  33 ++++
 .../cumulative/FederatedCummaxTestReference.dml    |  26 +++
 .../federated/cumulative/FederatedCumminTest.dml   |  33 ++++
 .../cumulative/FederatedCumminTestReference.dml    |  26 +++
 .../federated/cumulative/FederatedCumprodTest.dml  |  33 ++++
 .../cumulative/FederatedCumprodTestReference.dml   |  26 +++
 .../federated/cumulative/FederatedCumsumTest.dml   |  33 ++++
 .../cumulative/FederatedCumsumTestReference.dml    |  26 +++
 .../cumulative/FederatedCumsumprodTest.dml         |  32 +++
 .../FederatedCumsumprodTestReference.dml           |  25 +++
 17 files changed, 751 insertions(+), 31 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 31a7136..dc2fae5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -66,20 +66,7 @@ public class FederationUtils {
        }
 
        public static FederatedRequest callInstruction(String inst, CPOperand 
varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
-               //TODO better and safe replacement of operand names --> 
instruction utils
-               long id = getNextFedDataID();
-               String linst = inst.replace(ExecType.SPARK.name(), 
ExecType.CP.name());
-               linst = linst.replace(
-                       
Lop.OPERAND_DELIMITOR+varOldOut.getName()+Lop.DATATYPE_PREFIX,
-                       
Lop.OPERAND_DELIMITOR+String.valueOf(id)+Lop.DATATYPE_PREFIX);
-               for(int i=0; i<varOldIn.length; i++)
-                       if( varOldIn[i] != null ) {
-                               linst = linst.replace(
-                                       
Lop.OPERAND_DELIMITOR+varOldIn[i].getName()+Lop.DATATYPE_PREFIX,
-                                       
Lop.OPERAND_DELIMITOR+String.valueOf(varNewIn[i])+Lop.DATATYPE_PREFIX);
-                               linst = 
linst.replace("="+varOldIn[i].getName(), "="+String.valueOf(varNewIn[i])); 
//parameterized
-                       }
-               return new FederatedRequest(RequestType.EXEC_INST, id, linst);
+               return callInstruction(inst, varOldOut, getNextFedDataID(), 
varOldIn, varNewIn);
        }
 
        public static FederatedRequest[] callInstruction(String[] inst, 
CPOperand varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
@@ -89,10 +76,14 @@ public class FederationUtils {
                for(int j=0; j<inst.length; j++) {
                        for(int i = 0; i < varOldIn.length; i++) {
                                linst[j] = 
linst[j].replace(ExecType.SPARK.name(), ExecType.CP.name());
-                               linst[j] = 
linst[j].replace(Lop.OPERAND_DELIMITOR + varOldOut.getName() + 
Lop.DATATYPE_PREFIX, Lop.OPERAND_DELIMITOR + String.valueOf(id) + 
Lop.DATATYPE_PREFIX);
+                               linst[j] = linst[j].replace(
+                                       Lop.OPERAND_DELIMITOR + 
varOldOut.getName() + Lop.DATATYPE_PREFIX,
+                                       Lop.OPERAND_DELIMITOR + 
String.valueOf(id) + Lop.DATATYPE_PREFIX);
 
                                if(varOldIn[i] != null) {
-                                       linst[j] = 
linst[j].replace(Lop.OPERAND_DELIMITOR + varOldIn[i].getName() + 
Lop.DATATYPE_PREFIX, Lop.OPERAND_DELIMITOR + String.valueOf(varNewIn[i]) + 
Lop.DATATYPE_PREFIX);
+                                       linst[j] = linst[j].replace(
+                                               Lop.OPERAND_DELIMITOR + 
varOldIn[i].getName() + Lop.DATATYPE_PREFIX,
+                                               Lop.OPERAND_DELIMITOR + 
String.valueOf(varNewIn[i]) + Lop.DATATYPE_PREFIX);
                                        linst[j] = linst[j].replace("=" + 
varOldIn[i].getName(), "=" + String.valueOf(varNewIn[i])); //parameterized
                                }
                        }
@@ -101,6 +92,19 @@ public class FederationUtils {
                return fr;
        }
 
+       public static FederatedRequest callInstruction(String inst, CPOperand 
varOldOut, long outputId, CPOperand[] varOldIn, long[] varNewIn) {
+               String linst = InstructionUtils.replaceOperand(inst, 0, 
ExecType.CP.name());
+               linst = 
linst.replace(Lop.OPERAND_DELIMITOR+varOldOut.getName()+Lop.DATATYPE_PREFIX, 
Lop.OPERAND_DELIMITOR+outputId+Lop.DATATYPE_PREFIX);
+               for(int i=0; i<varOldIn.length; i++)
+                       if( varOldIn[i] != null ) {
+                               linst = linst.replace(
+                                       
Lop.OPERAND_DELIMITOR+varOldIn[i].getName()+Lop.DATATYPE_PREFIX,
+                                       
Lop.OPERAND_DELIMITOR+(varNewIn[i])+Lop.DATATYPE_PREFIX);
+                               linst = 
linst.replace("="+varOldIn[i].getName(), "="+(varNewIn[i])); //parameterized
+                       }
+               return new FederatedRequest(RequestType.EXEC_INST, outputId, 
linst);
+       }
+
        public static MatrixBlock aggAdd(Future<FederatedResponse>[] ffr) {
                try {
                        SimpleOperator op = new 
SimpleOperator(Plus.getPlusFnObject());
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 9245132..77b53f9 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -1000,7 +1000,11 @@ public class InstructionUtils
        public static String createLiteralOperand(String val, ValueType vt) {
                return InstructionUtils.concatOperandParts(val, 
DataType.SCALAR.name(), vt.name(), "true");
        }
-       
+
+       public static String createOperand(CPOperand operand) {
+               return InstructionUtils.concatOperandParts(operand.getName(), 
operand.getDataType().name(), operand.getValueType().name());
+       }
+
        public static String replaceOperand(String instStr, int operand, String 
newValue) {
                //split instruction and check for correctness
                String[] parts = instStr.split(Lop.OPERAND_DELIMITOR);
@@ -1048,4 +1052,14 @@ public class InstructionUtils
                        sb.append(inputs[i]);
                return sb.toString();
        }
+
+       public static String constructTernaryString(String instString, 
CPOperand op1, CPOperand op2, CPOperand op3, CPOperand out) {
+               return concatOperands(constructBinaryInstString(instString, 
"ifelse", op1, op2, op3), createOperand(out));
+       }
+
+       public static String constructBinaryInstString(String instString, 
String opcode, CPOperand op1, CPOperand op2, CPOperand out) {
+               String[] parts = instString.split(Lop.OPERAND_DELIMITOR);
+               parts[1] = opcode;
+               return InstructionUtils.concatOperands(parts[0], parts[1], 
createOperand(op1), createOperand(op2), createOperand(out));
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
index 8f58a8b..1d3c54c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstruction.java
@@ -32,6 +32,7 @@ public abstract class FEDInstruction extends Instruction {
                AggregateTernary,
                Append,
                Binary,
+               CumulativeAggregate,
                Init,
                MultiReturnParameterizedBuiltin,
                MMChain,
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 214023f..7439b6f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -127,7 +127,8 @@ public class FEDInstructionUtils {
                                        ((AggregateUnaryCPInstruction) 
instruction).getAUType() == AggregateUnaryCPInstruction.AUType.DEFAULT)
                                        fedinst = 
AggregateUnaryFEDInstruction.parseInstruction(inst.getInstructionString());
                                else if(inst instanceof 
UnaryMatrixCPInstruction && mo1.isFederated()) {
-                                       
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()))
+                                       
if(UnaryMatrixFEDInstruction.isValidOpcode(inst.getOpcode()) &&
+                                               
!(inst.getOpcode().equalsIgnoreCase("ucumk+*") && mo1.isFederated(FType.COL)))
                                                fedinst = 
UnaryMatrixFEDInstruction.parseInstruction(inst.getInstructionString());
                                }
                        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
index c6e0b2c..1f5cdd2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/UnaryMatrixFEDInstruction.java
@@ -19,17 +19,23 @@
 
 package org.apache.sysds.runtime.instructions.fed;
 
+import java.util.concurrent.Future;
+
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.matrix.data.LibCommonsMath;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 
@@ -39,8 +45,7 @@ public class UnaryMatrixFEDInstruction extends 
UnaryFEDInstruction {
        }
        
        public static boolean isValidOpcode(String opcode) {
-               return !LibCommonsMath.isSupportedUnaryOperation(opcode)
-                       && !opcode.startsWith("ucum"); //ucumk+ ucum* ucumk+* 
ucummin ucummax
+               return !LibCommonsMath.isSupportedUnaryOperation(opcode);
        }
 
        public static UnaryMatrixFEDInstruction parseInstruction(String str) {
@@ -50,7 +55,7 @@ public class UnaryMatrixFEDInstruction extends 
UnaryFEDInstruction {
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
                String opcode;
                opcode = parts[0];
-               if( opcode.equalsIgnoreCase("exp") && parts.length == 5) {
+               if( (opcode.equalsIgnoreCase("exp") || 
opcode.startsWith("ucum")) && parts.length == 5) {
                        in.split(parts[1]);
                        out.split(parts[2]);
                        ValueFunction func = Builtin.getBuiltinFnObject(opcode);
@@ -64,16 +69,207 @@ public class UnaryMatrixFEDInstruction extends 
UnaryFEDInstruction {
        @Override 
        public void processInstruction(ExecutionContext ec) {
                MatrixObject mo1 = ec.getMatrixObject(input1);
+               if(getOpcode().startsWith("ucum") && 
mo1.isFederated(FederationMap.FType.ROW))
+                       processCumulativeInstruction(ec, mo1);
+               else {
+                       //federated execution on arbitrary row/column partitions
+                       //(only assumption for sparse-unsafe: fed mapping 
covers entire matrix)
+                       FederatedRequest fr1 = 
FederationUtils.callInstruction(instString, output,
+                               new CPOperand[] {input1}, new long[] 
{mo1.getFedMapping().getID()});
+                       mo1.getFedMapping().execute(getTID(), true, fr1);
+
+                       setOutputFedMapping(ec, mo1, fr1.getID());
+               }
+       }
+
+       public void processCumulativeInstruction(ExecutionContext ec, 
MatrixObject mo1) {
+               String opcode = getOpcode();
+               MatrixObject out;
+               if(opcode.equalsIgnoreCase("ucumk+*")) {
+                       FederatedRequest fr1 = 
FederationUtils.callInstruction(instString, output,
+                               new CPOperand[] {input1}, new long[] 
{mo1.getFedMapping().getID()});
+                       FederatedRequest fr2 = new 
FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
+                       Future<FederatedResponse>[] tmp = 
mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+                       out = setOutputFedMapping(ec, mo1, fr1.getID());
+
+                       MatrixBlock scalingValues = getScalars(mo1, tmp);
+                       setScalingValues(ec, mo1, out, scalingValues);
+               }
+               else {
+                       String colAgg = opcode.replace("ucum", "uac");
+                       String agg2 = opcode.replace(opcode.contains("ucumk")? 
"ucumk" :"ucum", "");
+
+                       double init = opcode.equalsIgnoreCase("ucumk+") ? 0.0:
+                               opcode.equalsIgnoreCase("ucum*") ? 1.0 :
+                               opcode.equalsIgnoreCase("ucummin") ? 
Double.MAX_VALUE : -Double.MAX_VALUE;
+
+                       Future<FederatedResponse>[] tmp = 
modifyAndGetInstruction(colAgg, mo1);
+                       MatrixBlock scalingValues = getResultBlock(tmp, 
(int)mo1.getNumColumns(), opcode, init);
+
+                       out = ec.getMatrixObject(output);
+                       setScalingValues(agg2, ec, mo1, out, scalingValues, 
init);
+               }
+               processCumulative(out);
+       }
+
+       private Future<FederatedResponse>[] modifyAndGetInstruction(String 
newInst, MatrixObject mo1) {
+               String modifiedInstString = 
InstructionUtils.replaceOperand(instString, 1, newInst);
+
+               FederatedRequest fr1 = 
FederationUtils.callInstruction(modifiedInstString, output,
+                       new CPOperand[] {input1}, new long[] 
{mo1.getFedMapping().getID()});
+               FederatedRequest fr2 = new 
FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr1.getID());
+               return mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+       }
+
+       private void processCumulative(MatrixObject out) {
+               String modifiedInstString = 
InstructionUtils.replaceOperand(instString, 2, 
InstructionUtils.createOperand(output));
+
+               FederatedRequest fr4 = 
FederationUtils.callInstruction(modifiedInstString, output, 
out.getFedMapping().getID(),
+                       new CPOperand[] {output}, new long[] 
{out.getFedMapping().getID()});
+               out.getFedMapping().execute(getTID(), true, fr4);
+
+               
out.setFedMapping(out.getFedMapping().copyWithNewID(fr4.getID()));
+
+               // modify fed ranges since ucumk+* output is always nx1
+               if(getOpcode().equalsIgnoreCase("ucumk+*")) {
+                       out.getDataCharacteristics().set(out.getNumRows(), 1L, 
(int) out.getBlocksize());
+                       for(int i = 0; i < 
out.getFedMapping().getFederatedRanges().length; i++)
+                               
out.getFedMapping().getFederatedRanges()[i].setEndDim(1, 1);
+               } else {
+                       out.getDataCharacteristics().set(out.getNumRows(), 
out.getNumColumns(), (int) out.getBlocksize());
+               }
+       }
+
+       private static MatrixBlock getResultBlock(Future<FederatedResponse>[] 
tmp, int cols, String opcode, double init) {
+               //TODO perf simple rbind, as the first row (init) is anyway not 
transferred
                
-               //federated execution on arbitrary row/column partitions
-               //(only assumption for sparse-unsafe: fed mapping covers entire 
matrix)
-               FederatedRequest fr1 = 
FederationUtils.callInstruction(instString, output,
-                       new CPOperand[]{input1}, new 
long[]{mo1.getFedMapping().getID()});
-               mo1.getFedMapping().execute(getTID(), true, fr1);
+               //collect row vectors into local matrix
+               MatrixBlock res = new MatrixBlock(tmp.length, cols, init);
+               for(int i = 0; i < tmp.length-1; i++)
+                       try {
+                               res.copy(i+1, i+1, 0, cols-1, ((MatrixBlock) 
tmp[i].get().getData()[0]), true);
+                       }
+                       catch(Exception e) {
+                               throw new DMLRuntimeException("Federated Get 
data failed with exception on UnaryMatrixFEDInstruction", e);
+                       }
+
+               //local cumulative aggregate
+               return res.unaryOperations(
+                       new UnaryOperator(Builtin.getBuiltinFnObject(opcode)),
+                       new MatrixBlock());
+       }
+
+       private MatrixBlock getScalars(MatrixObject mo1, 
Future<FederatedResponse>[] tmp) {
+               MatrixBlock[] aggRes = getAggMatrices(mo1);
+               MatrixBlock prod = aggRes[0];
+               MatrixBlock firstValues = aggRes[1];
+               for(int i = 0; i < tmp.length; i++)
+                       try {
+                               MatrixBlock curr = ((MatrixBlock) 
tmp[i].get().getData()[0]);
+                               prod.setValue(i, 0, 
curr.getValue(curr.getNumRows()-1, 0));
+                       }
+                       catch(Exception e) {
+                               throw new DMLRuntimeException("Federated Get 
data failed with exception on UnaryMatrixFEDInstruction", e);
+                       }
+
+               // aggregate sumprod to get scalars
+               MatrixBlock a = new MatrixBlock(tmp.length, 1, 0.0);
+               a.copy(1, a.getNumRows()-1, 0, 0,
+                       prod.unaryOperations(new 
UnaryOperator(Builtin.getBuiltinFnObject("ucumk+*")), new MatrixBlock())
+                               .slice(0, prod.getNumRows()-2), true);
+
+               // compute  B11 = B11 + B12 ⊙ a
+               MatrixBlock B = firstValues.slice(0, 
firstValues.getNumRows()-1,1, 1)
+                       
.binaryOperations(InstructionUtils.parseBinaryOperator("*"), a, new 
MatrixBlock());
+               return 
B.binaryOperationsInPlace(InstructionUtils.parseBinaryOperator("+"), 
firstValues.slice(0,firstValues.getNumRows()-1,0,0));
+       }
+
+       private MatrixBlock[] getAggMatrices(MatrixObject mo1) {
+               Future<FederatedResponse>[] tmp = 
modifyAndGetInstruction("ucum*", mo1);
+
+               // slice and return prod and first value
+               MatrixBlock prod = new MatrixBlock(tmp.length, 2, 0.0);
+               MatrixBlock firstValues = new MatrixBlock(tmp.length, 2, 0.0);
+               for(int i = 0; i < tmp.length; i++)
+                       try {
+                               MatrixBlock curr = ((MatrixBlock) 
tmp[i].get().getData()[0]);
+                               prod.setValue(i, 1, 
curr.getValue(curr.getNumRows()-1, 1));
+                               firstValues.copy(i, i, 0,1, curr.slice(0, 0), 
true);
+                       }
+                       catch(Exception e) {
+                               throw new DMLRuntimeException("Federated Get 
data failed with exception on UnaryMatrixFEDInstruction", e);
+                       }
+               return new MatrixBlock[] {prod, firstValues};
+       }
+
+       private void setScalingValues(ExecutionContext ec, MatrixObject mo1, 
MatrixObject out, MatrixBlock scalingValues) {
+               MatrixBlock condition = new MatrixBlock((int) mo1.getNumRows(), 
(int) mo1.getNumColumns(), 1.0);
+               MatrixBlock mb2 = new MatrixBlock((int) mo1.getNumRows(), (int) 
mo1.getNumColumns(), 0.0);
+
+               for(int i = 0; i < scalingValues.getNumRows()-1; i++) {
+                       int step = (int) 
mo1.getFedMapping().getFederatedRanges()[i + 1].getBeginDims()[0];
+                       condition.setValue(step, 0, 0.0);
+                       mb2.setValue(step, 0, scalingValues.getValue(i + 1, 0));
+               }
+
+               MatrixObject cond = 
ExecutionContext.createMatrixObject(condition);
+               long condID = FederationUtils.getNextFedDataID();
+               ec.setVariable(String.valueOf(condID), cond);
+
+               MatrixObject mo2 = ExecutionContext.createMatrixObject(mb2);
+               long varID2 = FederationUtils.getNextFedDataID();
+               ec.setVariable(String.valueOf(varID2), mo2);
+
+               CPOperand opCond = new CPOperand(String.valueOf(condID), 
ValueType.FP64, DataType.MATRIX);
+               CPOperand op2 = new CPOperand(String.valueOf(varID2), 
ValueType.FP64, DataType.MATRIX);
+
+               String ternaryInstString = 
InstructionUtils.constructTernaryString(instString, opCond, input1, op2, 
output);
+
+               FederatedRequest[] fr1 = 
mo1.getFedMapping().broadcastSliced(cond, false);
+               FederatedRequest[] fr2 = 
mo1.getFedMapping().broadcastSliced(mo2, false);
+               FederatedRequest fr3 = 
FederationUtils.callInstruction(ternaryInstString, output,
+                       new CPOperand[] {input1, opCond, op2}, new long[] 
{mo1.getFedMapping().getID(), fr1[0].getID(), fr2[0].getID()});
+               //TODO perf no need to execute here, we can piggyback the 
requests onto the final cumagg
+               mo1.getFedMapping().execute(getTID(), true, fr1, fr2, fr3);
+
+               
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr3.getID()));
+
+               ec.removeVariable(opCond.getName());
+               ec.removeVariable(op2.getName());
+       }
+
+       private void setScalingValues(String opcode, ExecutionContext ec, 
MatrixObject mo1, MatrixObject out, MatrixBlock scalingValues, double init) {
+               //TODO perf improvement (currently this creates a sliced 
broadcast in the size of the original matrix
+               //but sparse w/ strategically placed offsets, but would need to 
be dense for dense prod/cumsum)
                
-               //set characteristics and fed mapp
+               //allocated large matrix of init value and placed offset rows 
in first row of every partition
+               MatrixBlock mb2 = new MatrixBlock((int) mo1.getNumRows(), (int) 
mo1.getNumColumns(), init);
+               for(int i = 1; i < scalingValues.getNumRows(); i++) {
+                       int step = (int) 
mo1.getFedMapping().getFederatedRanges()[i].getBeginDims()[0];
+                       mb2.copy(step, step, 0, (int)(mo1.getNumColumns()-1), 
scalingValues.slice(i, i), true);
+               }
+
+               MatrixObject mo2 = ExecutionContext.createMatrixObject(mb2);
+               long varID2 = FederationUtils.getNextFedDataID();
+               ec.setVariable(String.valueOf(varID2), mo2);
+               CPOperand op2 = new CPOperand(String.valueOf(varID2), 
ValueType.FP64, DataType.MATRIX);
+
+               String modifiedInstString = 
InstructionUtils.constructBinaryInstString(instString, opcode, input1, op2, 
output);
+
+               FederatedRequest[] fr1 = 
mo1.getFedMapping().broadcastSliced(mo2, false);
+               FederatedRequest fr2 = 
FederationUtils.callInstruction(modifiedInstString, output,
+                       new CPOperand[] {input1, op2}, new long[] 
{mo1.getFedMapping().getID(), fr1[0].getID()});
+               mo1.getFedMapping().execute(getTID(), true, fr1, fr2);
+
+               
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr2.getID()));
+
+               ec.removeVariable(op2.getName());
+       }
+
+       private MatrixObject setOutputFedMapping(ExecutionContext ec, 
MatrixObject fedMapObj, long fedOutputID) {
                MatrixObject out = ec.getMatrixObject(output);
-               out.getDataCharacteristics().set(mo1.getNumRows(), 
mo1.getNumColumns(), (int)mo1.getBlocksize());
-               
out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr1.getID()));
+               
out.getDataCharacteristics().set(fedMapObj.getDataCharacteristics());
+               
out.setFedMapping(fedMapObj.getFedMapping().copyWithNewID(fedOutputID));
+               return out;
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 615b28d..efd86a3 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -1466,9 +1466,9 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * only done if 'awareDestNZ=true', 
         * 
         * @param rl row lower index, 0-based
-        * @param ru row upper index, 0-based
+        * @param ru row upper index, 0-based, inclusive
         * @param cl column lower index, 0-based
-        * @param cu column upper index, 0-based
+        * @param cu column upper index, 0-based, inclusive
         * @param src matrix block
         * @param awareDestNZ
         *           true, forces (1) to remove existing non-zeros in the index 
range of the 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
new file mode 100644
index 0000000..336f45e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFullCumulativeTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.lops.LopProperties.ExecType;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedFullCumulativeTest extends AutomatedTestBase {
+       private final static String TEST_NAME1 = "FederatedCumsumTest";
+       private final static String TEST_NAME2 = "FederatedCumprodTest";
+       private final static String TEST_NAME3 = "FederatedCummaxTest";
+       private final static String TEST_NAME4 = "FederatedCumminTest";
+       private final static String TEST_NAME5 = "FederatedCumsumprodTest";
+
+       private final static String TEST_DIR = 
"functions/federated/cumulative/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedFullCumulativeTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+       @Parameterized.Parameter(2)
+       public boolean rowPartitioned;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(
+                       new Object[][] {
+                               {240, 4, true},
+                               {240, 4, false},
+                       });
+       }
+
+       private enum OpType {
+               SUM, PROD, SUMPROD, MAX, MIN
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+               addTestConfiguration(TEST_NAME3, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"S"}));
+               addTestConfiguration(TEST_NAME4, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"S"}));
+               addTestConfiguration(TEST_NAME5, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"S"}));
+       }
+
+       @Test
+       public void testSumDenseMatrixCP() { runCumOperationTest(OpType.SUM, 
ExecType.CP); }
+
+// FIXME offset handling has some remaining issues
+//     @Test
+//     public void testProdDenseMatrixCP() {
+//             runCumOperationTest(OpType.PROD, ExecType.CP);
+//     }
+
+       @Test
+       public void testMaxDenseMatrixCP() {
+               runCumOperationTest(OpType.MAX, ExecType.CP);
+       }
+
+       @Test
+       public void testMinDenseMatrixCP() {
+               runCumOperationTest(OpType.MIN, ExecType.CP);
+       }
+
+// FIXME offset handling has some remaining issues
+//     @Test
+//     public void testSumprodDenseMatrixCP() {
+//             runCumOperationTest(OpType.SUMPROD, ExecType.CP);
+//     }
+
+       private void runCumOperationTest(OpType type, ExecType instType) {
+               ExecMode platformOld = setExecMode(instType);
+
+               String TEST_NAME = null;
+               switch(type) {
+                       case SUM:
+                               TEST_NAME = TEST_NAME1;
+                               break;
+                       case PROD:
+                               TEST_NAME = TEST_NAME2;
+                               break;
+                       case MAX:
+                               TEST_NAME = TEST_NAME3;
+                               break;
+                       case MIN:
+                               TEST_NAME = TEST_NAME4;
+                               break;
+                       case SUMPROD:
+                               TEST_NAME = TEST_NAME5;
+                               break;
+               }
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int r = rows;
+               int c = cols / 4;
+               if(rowPartitioned) {
+                       r = rows / 4;
+                       c = cols;
+               }
+
+               double[][] X1 = getRandomMatrix(r, c, 1, 3, 1, 3);
+               double[][] X2 = getRandomMatrix(r, c, 1, 3, 1, 7);
+               double[][] X3 = getRandomMatrix(r, c, 1, 3, 1, 8);
+               double[][] X4 = getRandomMatrix(r, c, 1, 3, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r, c, 
blocksize, r * c);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+               Thread t4 = startLocalFedWorkerThread(port4);
+
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-stats", "100", "-args", 
input("X1"), input("X2"), input("X3"), input("X4"),
+                       expected("S"), 
Boolean.toString(rowPartitioned).toUpperCase()};
+               runTest(true, false, null, -1);
+
+               // Run actual dml script with federated matrix
+
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")), "rows=" + rows, "cols=" + cols,
+                       "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), 
"out_S=" + output("S")};
+
+               runTest(true, false, null, -1);
+
+               // compare via files
+               compareResults(1e-6);
+
+               switch(type) {
+                       case SUM:
+                               
Assert.assertTrue(heavyHittersContainsString("fed_ucumk+"));
+                               break;
+                       case PROD:
+                               
Assert.assertTrue(heavyHittersContainsString("fed_ucum*"));
+                               break;
+                       case MAX:
+                               
Assert.assertTrue(heavyHittersContainsString("fed_ucummax"));
+                               break;
+                       case MIN:
+                               
Assert.assertTrue(heavyHittersContainsString("fed_ucummin"));
+                               break;
+                       case SUMPROD:
+                               
Assert.assertTrue(heavyHittersContainsString("ucumk+*"));
+                               break;
+               }
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+               resetExecMode(platformOld);
+       }
+}
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml 
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml
new file mode 100644
index 0000000..9f0f40c
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cummax(A);
+write(s, $out_S);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
 
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
new file mode 100644
index 0000000..558c843
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCummaxTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cummax(A);
+write(s, $5);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml 
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml
new file mode 100644
index 0000000..f088e6b
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumminTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cummin(A);
+write(s, $out_S);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
 
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
new file mode 100644
index 0000000..e65f584
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCumminTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cummin(A);
+write(s, $5);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml 
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml
new file mode 100644
index 0000000..167cbbf
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cumprod(A);
+write(s, $out_S);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
 
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
new file mode 100644
index 0000000..a6c7311
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCumprodTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cumprod(A);
+write(s, $5);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml
new file mode 100644
index 0000000..757a9d7
--- /dev/null
+++ b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTest.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+
+s = cumsum(A);
+write(s, $out_S);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
new file mode 100644
index 0000000..c496905
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+
+s = cumsum(A);
+write(s, $5);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml
new file mode 100644
index 0000000..43978d9
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTest.dml
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if ($rP) {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+    A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+            ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), 
list($rows, $cols/2),
+               list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 
3*($cols/4)), list($rows, $cols)));
+}
+s = cumsumprod(A[,1:2]);
+write(s, $out_S);
diff --git 
a/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
new file mode 100644
index 0000000..5c73b14
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/cumulative/FederatedCumsumprodTestReference.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+if($6) { A = rbind(read($1), read($2), read($3), read($4)); }
+else { A = cbind(read($1), read($2), read($3), read($4)); }
+s = cumsumprod(A[,1:2]);
+write(s, $5);

Reply via email to