This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c07cd15 [SYSTEMDS-2925] Federated left indexing
c07cd15 is described below
commit c07cd15ba100d4133548336e91082922bf160337
Author: Olga <[email protected]>
AuthorDate: Wed Apr 7 00:51:10 2021 +0200
[SYSTEMDS-2925] Federated left indexing
Closes #1219
---
.../controlprogram/federated/FederationMap.java | 34 +++-
.../instructions/cp/VariableCPInstruction.java | 7 +-
.../instructions/fed/FEDInstructionUtils.java | 3 +-
.../instructions/fed/IndexingFEDInstruction.java | 141 ++++++++++++++-
.../primitives/FederatedLeftIndexTest.java | 201 +++++++++++++++++++++
.../federated/FederatedLeftIndexFrameFullTest.dml | 45 +++++
.../FederatedLeftIndexFrameFullTestReference.dml | 41 +++++
.../federated/FederatedLeftIndexFullTest.dml | 42 +++++
.../FederatedLeftIndexFullTestReference.dml | 38 ++++
9 files changed, 535 insertions(+), 17 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index 13e9fb7..9afa295 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -40,6 +40,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.Reques
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.IndexRange;
@@ -172,6 +173,22 @@ public class FederationMap {
return ret;
}
+ public FederatedRequest[] broadcastSliced(CacheableData<?> data,
boolean isFrame, int[][] ix) {
+ if( _type == FType.FULL )
+ return new FederatedRequest[]{broadcast(data)};
+
+ // prepare broadcast id and pin input
+ long id = FederationUtils.getNextFedDataID();
+ CacheBlock cb = data.acquireReadAndRelease();
+
+ // multi-threaded block slicing and federation request creation
+ FederatedRequest[] ret = new FederatedRequest[ix.length];
+ Arrays.setAll(ret,
+ i -> new FederatedRequest(RequestType.PUT_VAR, id,
+ cb.slice(ix[i][0], ix[i][1], ix[i][2],
ix[i][3], isFrame ? new FrameBlock() : new MatrixBlock())));
+ return ret;
+ }
+
public boolean isAligned(FederationMap that, boolean transposed) {
// determines if the two federated data are aligned row/column
partitions
// at the same federated site (which allows for purely
federated operation)
@@ -214,16 +231,19 @@ public class FederationMap {
}
@SuppressWarnings("unchecked")
- public Future<FederatedResponse>[] execute(long tid, boolean wait,
FederatedRequest[] frSlices1, FederatedRequest[] frSlices2, FederatedRequest...
fr) {
+ public Future<FederatedResponse>[] execute(long tid, boolean wait,
FederatedRange[] fedRange1, FederatedRequest elseFr, FederatedRequest[]
frSlices1, FederatedRequest[] frSlices2, FederatedRequest... fr) {
// executes step1[] - step 2 - ... step4 (only first step
federated-data-specific)
setThreadID(tid, frSlices1, fr);
setThreadID(tid, frSlices2, fr);
List<Future<FederatedResponse>> ret = new ArrayList<>();
int pos = 0;
for(Entry<FederatedRange, FederatedData> e :
_fedMap.entrySet()) {
- FederatedRequest[] newFr = (frSlices1!=null) ?
- ((frSlices2!=null)? (addAll(frSlices2[pos],
addAll(frSlices1[pos++], fr))) : addAll(frSlices1[pos++], fr)) : fr;
- ret.add(e.getValue().executeFederatedOperation(newFr));
+ if(Arrays.asList(fedRange1).contains(e.getKey())) {
+ FederatedRequest[] newFr = (frSlices1 != null)
? ((frSlices2 != null) ? (addAll(frSlices2[pos],
+ addAll(frSlices1[pos++], fr))) :
addAll(frSlices1[pos++], fr)) : fr;
+
ret.add(e.getValue().executeFederatedOperation(newFr));
+ }
+ else
ret.add(e.getValue().executeFederatedOperation(elseFr));
}
// prepare results (future federated responses), with optional
wait to ensure the
@@ -232,7 +252,11 @@ public class FederationMap {
FederationUtils.waitFor(ret);
return ret.toArray(new Future[0]);
}
-
+
+ public Future<FederatedResponse>[] execute(long tid, boolean wait,
FederatedRequest[] frSlices1, FederatedRequest[] frSlices2, FederatedRequest...
fr) {
+ return execute(tid, wait,
Arrays.stream(_fedMap.keySet().toArray()).toArray(FederatedRange[]::new), null,
frSlices1, frSlices2, fr);
+ }
+
@SuppressWarnings("unchecked")
public Future<FederatedResponse>[] executeMultipleSlices(long tid,
boolean wait,
FederatedRequest[][] frSlices, FederatedRequest[] fr) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index f56545a..52c6c74 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -436,8 +436,9 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
case CopyVariable:
// Value types are not given here
- in1 = new CPOperand(parts[1], ValueType.UNKNOWN,
DataType.UNKNOWN);
- in2 = new CPOperand(parts[2], ValueType.UNKNOWN,
DataType.UNKNOWN);
+ boolean withTypes =
parts[1].split(VALUETYPE_PREFIX).length > 2 &&
parts[2].split(VALUETYPE_PREFIX).length > 2;
+ in1 = withTypes ? new CPOperand(parts[1]) : new
CPOperand(parts[1], ValueType.UNKNOWN, DataType.UNKNOWN);
+ in2 = withTypes ? new CPOperand(parts[2]) : new
CPOperand(parts[2], ValueType.UNKNOWN, DataType.UNKNOWN);
break;
case MoveVariable:
@@ -905,7 +906,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
* @param ec execution context
*/
private void processCopyInstruction(ExecutionContext ec) {
- // get source variable
+ // get source variable
Data dd = ec.getVariable(getInput1().getName());
if ( dd == null )
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 3708d0a..ebd69d9 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -174,8 +174,7 @@ public class FEDInstructionUtils {
else if(inst instanceof IndexingCPInstruction) {
// matrix and frame indexing
IndexingCPInstruction minst = (IndexingCPInstruction)
inst;
- if(inst.getOpcode().equalsIgnoreCase("rightIndex")
- && (minst.input1.isMatrix() ||
minst.input1.isFrame())
+ if((minst.input1.isMatrix() || minst.input1.isFrame())
&&
ec.getCacheableData(minst.input1).isFederated()) {
fedinst =
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
index f8ab210..90f93de 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.ValueType;
@@ -40,6 +41,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.util.IndexRange;
public final class IndexingFEDInstruction extends UnaryFEDInstruction {
@@ -95,7 +97,25 @@ public final class IndexingFEDInstruction extends
UnaryFEDInstruction {
}
}
else if(opcode.equalsIgnoreCase(LeftIndex.OPCODE)) {
- throw new DMLRuntimeException("Left indexing not
implemented for federated operations.");
+ if ( parts.length == 8 ) {
+ CPOperand lhsInput, rhsInput, rl, ru, cl, cu,
out;
+ lhsInput = new CPOperand(parts[1]);
+ rhsInput = new CPOperand(parts[2]);
+ rl = new CPOperand(parts[3]);
+ ru = new CPOperand(parts[4]);
+ cl = new CPOperand(parts[5]);
+ cu = new CPOperand(parts[6]);
+ out = new CPOperand(parts[7]);
+
+ if((lhsInput.getDataType() !=
Types.DataType.MATRIX && lhsInput.getDataType() != Types.DataType.FRAME) &&
+ (rhsInput.getDataType() !=
Types.DataType.MATRIX && rhsInput.getDataType() != Types.DataType.FRAME))
+ throw new DMLRuntimeException("Can
index only on matrices, frames, and lists.");
+
+ return new IndexingFEDInstruction(lhsInput,
rhsInput, rl, ru, cl, cu, out, opcode, str);
+ }
+ else {
+ throw new DMLRuntimeException("Invalid number
of operands in instruction: " + str);
+ }
}
else {
throw new DMLRuntimeException("Unknown opcode while
parsing a MatrixIndexingFEDInstruction: " + str);
@@ -104,7 +124,10 @@ public final class IndexingFEDInstruction extends
UnaryFEDInstruction {
@Override
public void processInstruction(ExecutionContext ec) {
- rightIndexing(ec);
+ if(getOpcode().equalsIgnoreCase(RightIndex.OPCODE))
+ rightIndexing(ec);
+ else
+ leftIndexing(ec);
}
private void rightIndexing(ExecutionContext ec)
@@ -140,11 +163,7 @@ public final class IndexingFEDInstruction extends
UnaryFEDInstruction {
long[] newIx = new long[]{rsn, ren, csn, cen};
// change 4 indices in instString
- instStrings[i] = instString;
- String[] instParts =
instString.split(Lop.OPERAND_DELIMITOR);
- for(int j = 3; j < 7; j++)
- instParts[j] =
InstructionUtils.createLiteralOperand(String.valueOf(newIx[j-3]+1),
ValueType.INT64);
- instStrings[i] = String.join(Lop.OPERAND_DELIMITOR,
instParts);
+ instStrings[i] = modifyIndices(newIx, 3, 7);
if(input1.isFrame()) {
//modify frame schema
@@ -171,4 +190,112 @@ public final class IndexingFEDInstruction extends
UnaryFEDInstruction {
out.setFedMapping(fedMap.copyWithNewID(fr1[0].getID()));
}
}
+
+ private void leftIndexing(ExecutionContext ec)
+ {
+ //get input and requested index range
+ CacheableData<?> in1 = ec.getCacheableData(input1);
+ CacheableData<?> in2 = ec.getCacheableData(input2);
+ IndexRange ixrange = getIndexRange(ec);
+
+ //check bounds
+ if( ixrange.rowStart < 0 || ixrange.rowStart >=
in1.getNumRows() || ixrange.rowEnd >= in1.getNumRows()
+ || ixrange.colStart < 0 || ixrange.colStart >=
in1.getNumColumns() || ixrange.colEnd >= in1.getNumColumns() ) {
+ throw new DMLRuntimeException("Invalid values for
matrix indexing: ["+(ixrange.rowStart+1)+":"+(ixrange.rowEnd+1)+","
+ +
(ixrange.colStart+1)+":"+(ixrange.colEnd+1)+"] " + "must be within matrix
dimensions ["+in1.getNumRows()+","+in1.getNumColumns()+"].");
+ }
+ if( (ixrange.rowEnd-ixrange.rowStart+1) != in2.getNumRows() ||
(ixrange.colEnd-ixrange.colStart+1) != in2.getNumColumns()) {
+ throw new DMLRuntimeException("Invalid values for
matrix indexing: " +
+ "dimensions of the source matrix
["+in2.getNumRows()+"x" + in2.getNumColumns() + "] " +
+ "do not match the shape of the matrix specified
by indices [" +
+ (ixrange.rowStart+1) +":" + (ixrange.rowEnd+1)
+ ", " + (ixrange.colStart+1) + ":" + (ixrange.colEnd+1) + "].");
+ }
+
+ FederationMap fedMap = in1.getFedMapping();
+
+ String[] instStrings = new String[fedMap.getSize()];
+ int[][] sliceIxs = new int[fedMap.getSize()][];
+ FederatedRange[] ranges = new FederatedRange[fedMap.getSize()];
+
+ // replace old reshape values for each worker
+ int i = 0, prev = 0, from = fedMap.getSize();
+ for(FederatedRange range : fedMap.getMap().keySet()) {
+ long rs = range.getBeginDims()[0], re =
range.getEndDims()[0],
+ cs = range.getBeginDims()[1], ce =
range.getEndDims()[1];
+ long rsn = (ixrange.rowStart >= rs) ? (ixrange.rowStart
- rs) : 0;
+ long ren = (ixrange.rowEnd >= rs && ixrange.rowEnd <
re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
+ long csn = (ixrange.colStart >= cs) ? (ixrange.colStart
- cs) : 0;
+ long cen = (ixrange.colEnd >= cs && ixrange.colEnd <
ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
+
+ long[] newIx = new long[]{(int) rsn, (int) ren, (int)
csn, (int) cen};
+
+ // find ranges where to apply leftIndex
+ long to;
+ if(in1.isFederated(FederationMap.FType.ROW) && (to =
(prev + ren - rsn)) >= 0 &&
+ to < in2.getNumRows() && ixrange.rowStart <=
re) {
+ sliceIxs[i] = new int[] { prev, (int) to, 0,
(int) in2.getNumColumns()-1};
+ prev = (int) (to + 1);
+
+ instStrings[i] = modifyIndices(newIx, 4, 8);
+ ranges[i] = range;
+ from = Math.min(i, from);
+ }
+ else if(in1.isFederated(FederationMap.FType.COL) && (to
= (prev + cen - csn)) >= 0 &&
+ to < in2.getNumColumns() && ixrange.colStart <=
ce) {
+ sliceIxs[i] = new int[] {0, (int)
in2.getNumRows() - 1, prev, (int) to};
+ prev = (int) (to + 1);
+
+ instStrings[i] = modifyIndices(newIx, 4, 8);
+ ranges[i] = range;
+ from = Math.min(i, from);
+ }
+ else
+ // TODO shallow copy, add more advanced update
in place for federated
+ instStrings[i] = createCopyInstString();
+
+ i++;
+ }
+
+ sliceIxs =
Arrays.stream(sliceIxs).filter(Objects::nonNull).toArray(int[][] :: new);
+
+ FederatedRequest[] fr1 = fedMap.broadcastSliced(in2,
input2.isFrame(), sliceIxs);
+ FederatedRequest[] fr2 =
FederationUtils.callInstruction(instStrings, output, new CPOperand[]{input1,
input2},
+ new long[]{fedMap.getID(), fr1[0].getID()});
+ FederatedRequest fr3 = fedMap.cleanup(getTID(), fr1[0].getID());
+
+ //execute federated instruction and cleanup intermediates
+ if(sliceIxs.length == fedMap.getSize())
+ fedMap.execute(getTID(), true, fr2, fr1, fr3);
+ else {
+ // get index of cpvar request
+ for(i = 0; i < fr2.length; i++)
+ if(i < from || i >= from + sliceIxs.length)
+ break;
+ fedMap.execute(getTID(), true, ranges, (fr2[i]),
Arrays.copyOfRange(fr2, from, from + sliceIxs.length), fr1, fr3);
+ }
+
+ if(input1.isFrame()) {
+ FrameObject out = ec.getFrameObject(output);
+ out.setSchema(((FrameObject) in1).getSchema());
+
out.getDataCharacteristics().set(in1.getDataCharacteristics());
+ out.setFedMapping(fedMap.copyWithNewID(fr2[0].getID()));
+ } else {
+ MatrixObject out = ec.getMatrixObject(output);
+
out.getDataCharacteristics().set(in1.getDataCharacteristics());;
+ out.setFedMapping(fedMap.copyWithNewID(fr2[0].getID()));
+ }
+ }
+
+ private String modifyIndices(long[] newIx, int from, int to) {
+ // change 4 indices in instString
+ String[] instParts = instString.split(Lop.OPERAND_DELIMITOR);
+ for(int j = from; j < to; j++)
+ instParts[j] =
InstructionUtils.createLiteralOperand(String.valueOf(newIx[j-from]+1),
ValueType.INT64);
+ return String.join(Lop.OPERAND_DELIMITOR, instParts);
+ }
+
+ private String createCopyInstString() {
+ String[] instParts = instString.split(Lop.OPERAND_DELIMITOR);
+ return
VariableCPInstruction.prepareCopyInstruction(instParts[2],
instParts[8]).toString();
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
new file mode 100644
index 0000000..e6c3c85
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedLeftIndexTest extends AutomatedTestBase {
+
+ private final static String TEST_NAME1 = "FederatedLeftIndexFullTest";
+ private final static String TEST_NAME2 =
"FederatedLeftIndexFrameFullTest";
+
+ private final static String TEST_DIR = "functions/federated/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
FederatedLeftIndexTest.class.getSimpleName() + "/";
+
+ private final static int blocksize = 1024;
+ @Parameterized.Parameter()
+ public int rows1;
+ @Parameterized.Parameter(1)
+ public int cols1;
+
+ @Parameterized.Parameter(2)
+ public int rows2;
+ @Parameterized.Parameter(3)
+ public int cols2;
+
+ @Parameterized.Parameter(4)
+ public int from;
+ @Parameterized.Parameter(5)
+ public int to;
+
+ @Parameterized.Parameter(6)
+ public int from2;
+ @Parameterized.Parameter(7)
+ public int to2;
+
+ @Parameterized.Parameter(8)
+ public boolean rowPartitioned;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {8, 2, 8, 1, 1, 8, 1, 1, true},
+ {24, 12, 20, 8, 3, 22, 1, 8, true},
+ {24, 12, 10, 8, 7, 16, 1, 8, true},
+ {24, 12, 20, 11, 3, 22, 1, 11, false},
+ {24, 12, 20, 8, 3, 22, 1, 8, false},
+ {24, 12, 20, 8, 3, 22, 5, 12, false},
+ });
+ }
+
+ private enum DataType {
+ MATRIX, FRAME
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+ addTestConfiguration(TEST_NAME2, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+ }
+
+ @Test
+ public void testLeftIndexFullDenseMatrixCP() {
+ runAggregateOperationTest(DataType.MATRIX,
ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testLeftIndexFullDenseFrameCP() {
+ runAggregateOperationTest(DataType.FRAME, ExecMode.SINGLE_NODE);
+ }
+
+ private void runAggregateOperationTest(DataType dataType, ExecMode
execMode) {
+ setExecMode(execMode);
+
+ String TEST_NAME = null;
+
+ if(dataType == DataType.MATRIX)
+ TEST_NAME = TEST_NAME1;
+ else
+ TEST_NAME = TEST_NAME2;
+
+
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+
+ // write input matrices
+ int r1 = rows1;
+ int c1 = cols1 / 4;
+ if(rowPartitioned) {
+ r1 = rows1 / 4;
+ c1 = cols1;
+ }
+
+ double[][] X1 = getRandomMatrix(r1, c1, 1, 5, 1, 3);
+ double[][] X2 = getRandomMatrix(r1, c1, 1, 5, 1, 7);
+ double[][] X3 = getRandomMatrix(r1, c1, 1, 5, 1, 8);
+ double[][] X4 = getRandomMatrix(r1, c1, 1, 5, 1, 9);
+
+ MatrixCharacteristics mc = new MatrixCharacteristics(r1, c1,
blocksize, r1 * c1);
+ writeInputMatrixWithMTD("X1", X1, false, mc);
+ writeInputMatrixWithMTD("X2", X2, false, mc);
+ writeInputMatrixWithMTD("X3", X3, false, mc);
+ writeInputMatrixWithMTD("X4", X4, false, mc);
+
+ double[][] Y = getRandomMatrix(rows2, cols2, 1, 5, 1, 3);
+
+ MatrixCharacteristics mc2 = new MatrixCharacteristics(rows2,
cols2, blocksize, rows2 * cols2);
+ writeInputMatrixWithMTD("Y", Y, false, mc2);
+
+ // empty script name because we don't execute any script, just
start the worker
+ fullDMLScriptName = "";
+ int port1 = getRandomAvailablePort();
+ int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
+ Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+ Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+ Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+ Thread t4 = startLocalFedWorkerThread(port4);
+
+ rtplatform = execMode;
+ if(rtplatform == ExecMode.SPARK) {
+ System.out.println(7);
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+ }
+ TestConfiguration config =
availableTestConfigurations.get(TEST_NAME);
+ loadTestConfiguration(config);
+
+ if(from > to)
+ from = to;
+ if(from2 > to2)
+ from2 = to2;
+
+ // Run reference dml script with normal matrix
+ fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+ programArgs = new String[] {"-explain", "-args", input("X1"),
input("X2"), input("X3"), input("X4"),
+ input("Y"), String.valueOf(from), String.valueOf(to),
+ String.valueOf(from2), String.valueOf(to2),
+ Boolean.toString(rowPartitioned).toUpperCase(),
expected("S")};
+ runTest(null);
+ // Run actual dml script with federated matrix
+
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "100", "-nvargs",
+ "in_X1=" + TestUtils.federatedAddress(port1,
input("X1")),
+ "in_X2=" + TestUtils.federatedAddress(port2,
input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3,
input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4,
input("X4")),
+ "in_Y=" + input("Y"), "rows=" + rows1, "cols=" + cols1,
+ "rows2=" + rows2, "cols2=" + cols2,
+ "from=" + from, "to=" + to,"from2=" + from2, "to2=" +
to2,
+ "rP=" + Boolean.toString(rowPartitioned).toUpperCase(),
"out_S=" + output("S")};
+
+ runTest(null);
+
+ // compare via files
+ compareResults(1e-9);
+
+ Assert.assertTrue(heavyHittersContainsString("fed_leftIndex"));
+
+ // check that federated input files are still existing
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+ Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
+ }
+}
diff --git
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
new file mode 100644
index 0000000..5604989
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
@@ -0,0 +1,45 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $from;
+to = $to;
+from2 = $from2;
+to2 = $to2;
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows,
$cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)),
list($rows, $cols)));
+}
+
+B = read($in_Y)
+
+B = as.frame(B)
+A = as.frame(A)
+
+A[from:to, from2:to2] = B;
+write(A, $out_S);
+
+print(toString(A))
\ No newline at end of file
diff --git
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
new file mode 100644
index 0000000..4b5a852
--- /dev/null
+++
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $6;
+to = $7;
+from2 = $8;
+to2 = $9;
+if($10) {
+ A = rbind(read($1), read($2), read($3), read($4));
+}
+else {
+ A = cbind(read($1), read($2), read($3), read($4));
+}
+
+B = read($5)
+
+B = as.frame(B)
+A = as.frame(A)
+
+A[from:to, from2:to2] = B;
+write(A, $11);
+
+print(toString(A))
diff --git
a/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml
new file mode 100644
index 0000000..a201f7b
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $from;
+to = $to;
+from2 = $from2;
+to2 = $to2;
+
+if ($rP) {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)));
+} else {
+ A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows,
$cols/2),
+ list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)),
list($rows, $cols)));
+}
+
+B = read($in_Y)
+
+A[from:to, from2:to2] = B;
+write(A, $out_S);
+
+print(toString(A))
diff --git
a/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml
new file mode 100644
index 0000000..2cc29f7
--- /dev/null
+++
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $6;
+to = $7;
+from2 = $8;
+to2 = $9;
+if($10) {
+ A = rbind(read($1), read($2), read($3), read($4));
+}
+else {
+ A = cbind(read($1), read($2), read($3), read($4));
+}
+
+B = read($5)
+
+A[from:to, from2:to2] = B;
+write(A, $11);
+
+print(toString(A))