This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c07cd15  [SYSTEMDS-2925] Federated left indexing
c07cd15 is described below

commit c07cd15ba100d4133548336e91082922bf160337
Author: Olga <[email protected]>
AuthorDate: Wed Apr 7 00:51:10 2021 +0200

    [SYSTEMDS-2925] Federated left indexing
    
    Closes #1219
---
 .../controlprogram/federated/FederationMap.java    |  34 +++-
 .../instructions/cp/VariableCPInstruction.java     |   7 +-
 .../instructions/fed/FEDInstructionUtils.java      |   3 +-
 .../instructions/fed/IndexingFEDInstruction.java   | 141 ++++++++++++++-
 .../primitives/FederatedLeftIndexTest.java         | 201 +++++++++++++++++++++
 .../federated/FederatedLeftIndexFrameFullTest.dml  |  45 +++++
 .../FederatedLeftIndexFrameFullTestReference.dml   |  41 +++++
 .../federated/FederatedLeftIndexFullTest.dml       |  42 +++++
 .../FederatedLeftIndexFullTestReference.dml        |  38 ++++
 9 files changed, 535 insertions(+), 17 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
index 13e9fb7..9afa295 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationMap.java
@@ -40,6 +40,7 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.Reques
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.IndexRange;
@@ -172,6 +173,22 @@ public class FederationMap {
                return ret;
        }
 
+       public FederatedRequest[] broadcastSliced(CacheableData<?> data, 
boolean isFrame, int[][] ix) {
+               if( _type == FType.FULL )
+                       return new FederatedRequest[]{broadcast(data)};
+
+               // prepare broadcast id and pin input
+               long id = FederationUtils.getNextFedDataID();
+               CacheBlock cb = data.acquireReadAndRelease();
+
+               // multi-threaded block slicing and federation request creation
+               FederatedRequest[] ret = new FederatedRequest[ix.length];
+               Arrays.setAll(ret,
+                       i -> new FederatedRequest(RequestType.PUT_VAR, id,
+                               cb.slice(ix[i][0], ix[i][1], ix[i][2], 
ix[i][3], isFrame ? new FrameBlock() : new MatrixBlock())));
+               return ret;
+       }
+
        public boolean isAligned(FederationMap that, boolean transposed) {
                // determines if the two federated data are aligned row/column 
partitions
                // at the same federated site (which allows for purely 
federated operation)
@@ -214,16 +231,19 @@ public class FederationMap {
        }
 
        @SuppressWarnings("unchecked")
-       public Future<FederatedResponse>[] execute(long tid, boolean wait, 
FederatedRequest[] frSlices1, FederatedRequest[] frSlices2, FederatedRequest... 
fr) {
+       public Future<FederatedResponse>[] execute(long tid, boolean wait, 
FederatedRange[] fedRange1, FederatedRequest elseFr, FederatedRequest[] 
frSlices1, FederatedRequest[] frSlices2, FederatedRequest... fr) {
                // executes step1[] - step 2 - ... step4 (only first step 
federated-data-specific)
                setThreadID(tid, frSlices1, fr);
                setThreadID(tid, frSlices2, fr);
                List<Future<FederatedResponse>> ret = new ArrayList<>();
                int pos = 0;
                for(Entry<FederatedRange, FederatedData> e : 
_fedMap.entrySet()) {
-                       FederatedRequest[] newFr = (frSlices1!=null) ?
-                               ((frSlices2!=null)? (addAll(frSlices2[pos], 
addAll(frSlices1[pos++], fr))) : addAll(frSlices1[pos++], fr)) : fr;
-                       ret.add(e.getValue().executeFederatedOperation(newFr));
+                       if(Arrays.asList(fedRange1).contains(e.getKey())) {
+                               FederatedRequest[] newFr = (frSlices1 != null) 
? ((frSlices2 != null) ? (addAll(frSlices2[pos],
+                                       addAll(frSlices1[pos++], fr))) : 
addAll(frSlices1[pos++], fr)) : fr;
+                               
ret.add(e.getValue().executeFederatedOperation(newFr));
+                       }
+                       else 
ret.add(e.getValue().executeFederatedOperation(elseFr));
                }
 
                // prepare results (future federated responses), with optional 
wait to ensure the
@@ -232,7 +252,11 @@ public class FederationMap {
                        FederationUtils.waitFor(ret);
                return ret.toArray(new Future[0]);
        }
-       
+
+       public Future<FederatedResponse>[] execute(long tid, boolean wait, 
FederatedRequest[] frSlices1, FederatedRequest[] frSlices2, FederatedRequest... 
fr) {
+               return execute(tid, wait, 
Arrays.stream(_fedMap.keySet().toArray()).toArray(FederatedRange[]::new), null, 
frSlices1, frSlices2, fr);
+       }
+
        @SuppressWarnings("unchecked")
        public Future<FederatedResponse>[] executeMultipleSlices(long tid, 
boolean wait,
                FederatedRequest[][] frSlices, FederatedRequest[] fr) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index f56545a..52c6c74 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -436,8 +436,9 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        
                case CopyVariable:
                        // Value types are not given here
-                       in1 = new CPOperand(parts[1], ValueType.UNKNOWN, 
DataType.UNKNOWN);
-                       in2 = new CPOperand(parts[2], ValueType.UNKNOWN, 
DataType.UNKNOWN);
+                       boolean withTypes = 
parts[1].split(VALUETYPE_PREFIX).length > 2 && 
parts[2].split(VALUETYPE_PREFIX).length > 2;
+                       in1 = withTypes ? new CPOperand(parts[1]) : new 
CPOperand(parts[1], ValueType.UNKNOWN, DataType.UNKNOWN);
+                       in2 = withTypes ? new CPOperand(parts[2]) : new 
CPOperand(parts[2], ValueType.UNKNOWN, DataType.UNKNOWN);
                        break;
                        
                case MoveVariable:
@@ -905,7 +906,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
         * @param ec execution context
         */
        private void processCopyInstruction(ExecutionContext ec) {
-               // get source variable 
+               // get source variable
                Data dd = ec.getVariable(getInput1().getName());
                
                if ( dd == null )
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 3708d0a..ebd69d9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -174,8 +174,7 @@ public class FEDInstructionUtils {
                else if(inst instanceof IndexingCPInstruction) {
                        // matrix and frame indexing
                        IndexingCPInstruction minst = (IndexingCPInstruction) 
inst;
-                       if(inst.getOpcode().equalsIgnoreCase("rightIndex")
-                               && (minst.input1.isMatrix() || 
minst.input1.isFrame())
+                       if((minst.input1.isMatrix() || minst.input1.isFrame())
                                && 
ec.getCacheableData(minst.input1).isFederated()) {
                                fedinst = 
IndexingFEDInstruction.parseInstruction(minst.getInstructionString());
                        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
index f8ab210..90f93de 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/IndexingFEDInstruction.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ValueType;
@@ -40,6 +41,7 @@ import 
org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.util.IndexRange;
 
 public final class IndexingFEDInstruction extends UnaryFEDInstruction {
@@ -95,7 +97,25 @@ public final class IndexingFEDInstruction extends 
UnaryFEDInstruction {
                        }
                }
                else if(opcode.equalsIgnoreCase(LeftIndex.OPCODE)) {
-                       throw new DMLRuntimeException("Left indexing not 
implemented for federated operations.");
+                       if ( parts.length == 8 ) {
+                               CPOperand lhsInput, rhsInput, rl, ru, cl, cu, 
out;
+                               lhsInput = new CPOperand(parts[1]);
+                               rhsInput = new CPOperand(parts[2]);
+                               rl = new CPOperand(parts[3]);
+                               ru = new CPOperand(parts[4]);
+                               cl = new CPOperand(parts[5]);
+                               cu = new CPOperand(parts[6]);
+                               out = new CPOperand(parts[7]);
+
+                               if((lhsInput.getDataType() != 
Types.DataType.MATRIX && lhsInput.getDataType() != Types.DataType.FRAME) &&
+                                       (rhsInput.getDataType() != 
Types.DataType.MATRIX && rhsInput.getDataType() != Types.DataType.FRAME))
+                                       throw new DMLRuntimeException("Can 
index only on matrices, frames, and lists.");
+
+                               return new IndexingFEDInstruction(lhsInput, 
rhsInput, rl, ru, cl, cu, out, opcode, str);
+                       }
+                       else {
+                               throw new DMLRuntimeException("Invalid number 
of operands in instruction: " + str);
+                       }
                }
                else {
                        throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixIndexingFEDInstruction: " + str);
@@ -104,7 +124,10 @@ public final class IndexingFEDInstruction extends 
UnaryFEDInstruction {
 
        @Override
        public void processInstruction(ExecutionContext ec) {
-               rightIndexing(ec);
+               if(getOpcode().equalsIgnoreCase(RightIndex.OPCODE))
+                       rightIndexing(ec);
+               else
+                       leftIndexing(ec);
        }
 
        private void rightIndexing(ExecutionContext ec)
@@ -140,11 +163,7 @@ public final class IndexingFEDInstruction extends 
UnaryFEDInstruction {
                        long[] newIx = new long[]{rsn, ren, csn, cen};
 
                        // change 4 indices in instString
-                       instStrings[i] = instString;
-                       String[] instParts = 
instString.split(Lop.OPERAND_DELIMITOR);
-                       for(int j = 3; j < 7; j++)
-                               instParts[j] = 
InstructionUtils.createLiteralOperand(String.valueOf(newIx[j-3]+1), 
ValueType.INT64);
-                       instStrings[i] = String.join(Lop.OPERAND_DELIMITOR, 
instParts);
+                       instStrings[i] = modifyIndices(newIx, 3, 7);
                        
                        if(input1.isFrame()) {
                                //modify frame schema
@@ -171,4 +190,112 @@ public final class IndexingFEDInstruction extends 
UnaryFEDInstruction {
                        out.setFedMapping(fedMap.copyWithNewID(fr1[0].getID()));
                }
        }
+
+       private void leftIndexing(ExecutionContext ec)
+       {
+               //get input and requested index range
+               CacheableData<?> in1 = ec.getCacheableData(input1);
+               CacheableData<?> in2 = ec.getCacheableData(input2);
+               IndexRange ixrange = getIndexRange(ec);
+
+               //check bounds
+               if( ixrange.rowStart < 0 || ixrange.rowStart >= 
in1.getNumRows() || ixrange.rowEnd >= in1.getNumRows()
+                       || ixrange.colStart < 0 || ixrange.colStart >= 
in1.getNumColumns() || ixrange.colEnd >= in1.getNumColumns() ) {
+                       throw new DMLRuntimeException("Invalid values for 
matrix indexing: ["+(ixrange.rowStart+1)+":"+(ixrange.rowEnd+1)+","
+                               + 
(ixrange.colStart+1)+":"+(ixrange.colEnd+1)+"] " + "must be within matrix 
dimensions ["+in1.getNumRows()+","+in1.getNumColumns()+"].");
+               }
+               if( (ixrange.rowEnd-ixrange.rowStart+1) != in2.getNumRows() || 
(ixrange.colEnd-ixrange.colStart+1) != in2.getNumColumns()) {
+                       throw new DMLRuntimeException("Invalid values for 
matrix indexing: " +
+                               "dimensions of the source matrix 
["+in2.getNumRows()+"x" + in2.getNumColumns() + "] " +
+                               "do not match the shape of the matrix specified 
by indices [" +
+                               (ixrange.rowStart+1) +":" + (ixrange.rowEnd+1) 
+ ", " + (ixrange.colStart+1) + ":" + (ixrange.colEnd+1) + "].");
+               }
+
+               FederationMap fedMap = in1.getFedMapping();
+
+               String[] instStrings = new String[fedMap.getSize()];
+               int[][] sliceIxs = new int[fedMap.getSize()][];
+               FederatedRange[] ranges = new FederatedRange[fedMap.getSize()];
+
+               // replace old reshape values for each worker
+               int i = 0, prev = 0, from = fedMap.getSize();
+               for(FederatedRange range : fedMap.getMap().keySet()) {
+                       long rs = range.getBeginDims()[0], re = 
range.getEndDims()[0],
+                               cs = range.getBeginDims()[1], ce = 
range.getEndDims()[1];
+                       long rsn = (ixrange.rowStart >= rs) ? (ixrange.rowStart 
- rs) : 0;
+                       long ren = (ixrange.rowEnd >= rs && ixrange.rowEnd < 
re) ? (ixrange.rowEnd - rs) : (re - rs - 1);
+                       long csn = (ixrange.colStart >= cs) ? (ixrange.colStart 
- cs) : 0;
+                       long cen = (ixrange.colEnd >= cs && ixrange.colEnd < 
ce) ? (ixrange.colEnd - cs) : (ce - cs - 1);
+
+                       long[] newIx = new long[]{(int) rsn, (int) ren, (int) 
csn, (int) cen};
+
+                       // find ranges where to apply  leftIndex
+                       long to;
+                       if(in1.isFederated(FederationMap.FType.ROW) && (to = 
(prev + ren - rsn)) >= 0 &&
+                               to < in2.getNumRows() && ixrange.rowStart <= 
re) {
+                               sliceIxs[i] = new int[] { prev, (int) to, 0, 
(int) in2.getNumColumns()-1};
+                               prev = (int) (to + 1);
+
+                               instStrings[i] = modifyIndices(newIx, 4, 8);
+                               ranges[i] = range;
+                               from = Math.min(i, from);
+                       }
+                       else if(in1.isFederated(FederationMap.FType.COL) && (to 
= (prev + cen - csn)) >= 0 &&
+                               to < in2.getNumColumns() && ixrange.colStart <= 
ce) {
+                               sliceIxs[i] = new int[] {0, (int) 
in2.getNumRows() - 1, prev, (int) to};
+                               prev = (int) (to + 1);
+
+                               instStrings[i] = modifyIndices(newIx, 4, 8);
+                               ranges[i] = range;
+                               from = Math.min(i, from);
+                       }
+                       else
+                               // TODO shallow copy, add more advanced update 
in place for federated
+                               instStrings[i] = createCopyInstString();
+
+                       i++;
+               }
+
+               sliceIxs = 
Arrays.stream(sliceIxs).filter(Objects::nonNull).toArray(int[][] :: new);
+
+               FederatedRequest[] fr1 = fedMap.broadcastSliced(in2, 
input2.isFrame(), sliceIxs);
+               FederatedRequest[] fr2 = 
FederationUtils.callInstruction(instStrings, output, new CPOperand[]{input1, 
input2},
+                       new long[]{fedMap.getID(), fr1[0].getID()});
+               FederatedRequest fr3 = fedMap.cleanup(getTID(), fr1[0].getID());
+
+               //execute federated instruction and cleanup intermediates
+               if(sliceIxs.length == fedMap.getSize())
+                       fedMap.execute(getTID(), true, fr2, fr1, fr3);
+               else {
+                       // get index of cpvar request
+                       for(i = 0; i < fr2.length; i++)
+                               if(i < from || i >= from + sliceIxs.length)
+                                       break;
+                       fedMap.execute(getTID(), true, ranges, (fr2[i]), 
Arrays.copyOfRange(fr2, from, from + sliceIxs.length), fr1, fr3);
+               }
+
+               if(input1.isFrame()) {
+                       FrameObject out = ec.getFrameObject(output);
+                       out.setSchema(((FrameObject) in1).getSchema());
+                       
out.getDataCharacteristics().set(in1.getDataCharacteristics());
+                       out.setFedMapping(fedMap.copyWithNewID(fr2[0].getID()));
+               } else {
+                       MatrixObject out = ec.getMatrixObject(output);
+                       
out.getDataCharacteristics().set(in1.getDataCharacteristics());;
+                       out.setFedMapping(fedMap.copyWithNewID(fr2[0].getID()));
+               }
+       }
+
+       private String modifyIndices(long[] newIx, int from, int to) {
+               // change 4 indices in instString
+               String[] instParts = instString.split(Lop.OPERAND_DELIMITOR);
+               for(int j = from; j < to; j++)
+                       instParts[j] = 
InstructionUtils.createLiteralOperand(String.valueOf(newIx[j-from]+1), 
ValueType.INT64);
+               return String.join(Lop.OPERAND_DELIMITOR, instParts);
+       }
+
+       private String createCopyInstString() {
+               String[] instParts = instString.split(Lop.OPERAND_DELIMITOR);
+               return 
VariableCPInstruction.prepareCopyInstruction(instParts[2], 
instParts[8]).toString();
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
new file mode 100644
index 0000000..e6c3c85
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedLeftIndexTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedLeftIndexTest extends AutomatedTestBase {
+
+       private final static String TEST_NAME1 = "FederatedLeftIndexFullTest";
+       private final static String TEST_NAME2 = 
"FederatedLeftIndexFrameFullTest";
+
+       private final static String TEST_DIR = "functions/federated/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedLeftIndexTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows1;
+       @Parameterized.Parameter(1)
+       public int cols1;
+
+       @Parameterized.Parameter(2)
+       public int rows2;
+       @Parameterized.Parameter(3)
+       public int cols2;
+
+       @Parameterized.Parameter(4)
+       public int from;
+       @Parameterized.Parameter(5)
+       public int to;
+
+       @Parameterized.Parameter(6)
+       public int from2;
+       @Parameterized.Parameter(7)
+       public int to2;
+
+       @Parameterized.Parameter(8)
+       public boolean rowPartitioned;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                       {8, 2, 8, 1, 1, 8, 1, 1, true},
+                       {24, 12, 20, 8, 3, 22, 1, 8, true},
+                       {24, 12, 10, 8, 7, 16, 1, 8, true},
+                       {24, 12, 20, 11, 3, 22, 1, 11, false},
+                       {24, 12, 20, 8, 3, 22, 1, 8, false},
+                       {24, 12, 20, 8, 3, 22, 5, 12, false},
+               });
+       }
+
+       private enum DataType {
+               MATRIX, FRAME
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"S"}));
+       }
+
+       @Test
+       public void testLeftIndexFullDenseMatrixCP() {
+               runAggregateOperationTest(DataType.MATRIX, 
ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testLeftIndexFullDenseFrameCP() {
+               runAggregateOperationTest(DataType.FRAME, ExecMode.SINGLE_NODE);
+       }
+
+       private void runAggregateOperationTest(DataType dataType, ExecMode 
execMode) {
+               setExecMode(execMode);
+
+               String TEST_NAME = null;
+
+               if(dataType == DataType.MATRIX)
+                       TEST_NAME = TEST_NAME1;
+               else
+                       TEST_NAME = TEST_NAME2;
+
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int r1 = rows1;
+               int c1 = cols1 / 4;
+               if(rowPartitioned) {
+                       r1 = rows1 / 4;
+                       c1 = cols1;
+               }
+
+               double[][] X1 = getRandomMatrix(r1, c1, 1, 5, 1, 3);
+               double[][] X2 = getRandomMatrix(r1, c1, 1, 5, 1, 7);
+               double[][] X3 = getRandomMatrix(r1, c1,  1, 5, 1, 8);
+               double[][] X4 = getRandomMatrix(r1, c1, 1, 5, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r1, c1,  
blocksize, r1 * c1);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               double[][] Y = getRandomMatrix(rows2, cols2, 1, 5, 1, 3);
+
+               MatrixCharacteristics mc2 = new MatrixCharacteristics(rows2, 
cols2, blocksize, rows2 * cols2);
+               writeInputMatrixWithMTD("Y", Y, false, mc2);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+               Thread t4 = startLocalFedWorkerThread(port4);
+
+               rtplatform = execMode;
+               if(rtplatform == ExecMode.SPARK) {
+                       System.out.println(7);
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               if(from > to)
+                       from = to;
+               if(from2 > to2)
+                       from2 = to2;
+
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-explain", "-args", input("X1"), 
input("X2"), input("X3"), input("X4"),
+                       input("Y"), String.valueOf(from), String.valueOf(to),
+                       String.valueOf(from2), String.valueOf(to2),
+                       Boolean.toString(rowPartitioned).toUpperCase(), 
expected("S")};
+               runTest(null);
+               // Run actual dml script with federated matrix
+
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")),
+                       "in_Y=" + input("Y"), "rows=" + rows1, "cols=" + cols1,
+                       "rows2=" + rows2, "cols2=" + cols2,
+                       "from=" + from, "to=" + to,"from2=" + from2, "to2=" + 
to2,
+                       "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), 
"out_S=" + output("S")};
+
+               runTest(null);
+
+               // compare via files
+               compareResults(1e-9);
+
+               Assert.assertTrue(heavyHittersContainsString("fed_leftIndex"));
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+       }
+}
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml 
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
new file mode 100644
index 0000000..5604989
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
@@ -0,0 +1,45 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $from;
+to = $to;
+from2 = $from2;
+to2 = $to2;
+
+if ($rP) {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, 
$cols/2),
+    list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), 
list($rows, $cols)));
+}
+
+B = read($in_Y)
+
+B = as.frame(B)
+A = as.frame(A)
+
+A[from:to, from2:to2] = B;
+write(A, $out_S);
+
+print(toString(A))
\ No newline at end of file
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
 
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
new file mode 100644
index 0000000..4b5a852
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTestReference.dml
@@ -0,0 +1,41 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $6;
+to = $7;
+from2 = $8;
+to2 = $9;
+if($10) {
+  A = rbind(read($1), read($2), read($3), read($4));
+}
+else {
+  A = cbind(read($1), read($2), read($3), read($4));
+}
+
+B = read($5)
+
+B = as.frame(B)
+A = as.frame(A)
+
+A[from:to, from2:to2] = B;
+write(A, $11);
+
+print(toString(A))
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml 
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml
new file mode 100644
index 0000000..a201f7b
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedLeftIndexFullTest.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $from;
+to = $to;
+from2 = $from2;
+to2 = $to2;
+
+if ($rP) {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+} else {
+  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, 
$cols/2),
+    list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), 
list($rows, $cols)));
+}
+
+B = read($in_Y)
+
+A[from:to, from2:to2] = B;
+write(A, $out_S);
+
+print(toString(A))
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml 
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml
new file mode 100644
index 0000000..2cc29f7
--- /dev/null
+++ 
b/src/test/scripts/functions/federated/FederatedLeftIndexFullTestReference.dml
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+from = $6;
+to = $7;
+from2 = $8;
+to2 = $9;
+if($10) {
+  A = rbind(read($1), read($2), read($3), read($4));
+}
+else {
+  A = cbind(read($1), read($2), read($3), read($4));
+}
+
+B = read($5)
+
+A[from:to, from2:to2] = B;
+write(A, $11);
+
+print(toString(A))

Reply via email to