This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new db30d5d  [SYSTEMDS-2978] Federated frame element-wise map-lambda 
operations
db30d5d is described below

commit db30d5d319426fa3d0695670214515bef897b869
Author: Olga <[email protected]>
AuthorDate: Sat May 22 22:13:36 2021 +0200

    [SYSTEMDS-2978] Federated frame element-wise map-lambda operations
    
    Closes #1280.
---
 .../runtime/instructions/InstructionUtils.java     |   2 +-
 .../instructions/fed/BinaryFEDInstruction.java     |   2 +
 .../fed/BinaryFrameScalarFEDInstruction.java       |  53 ++++++++
 .../instructions/fed/FEDInstructionUtils.java      |   4 +
 .../primitives/FederatedFrameMapTest.java          | 150 +++++++++++++++++++++
 ...FrameFullTest.dml => FederatedFrameMapTest.dml} |  16 +--
 ...Test.dml => FederatedFrameMapTestReference.dml} |  28 ++--
 .../federated/FederatedLeftIndexFrameFullTest.dml  |   2 +-
 8 files changed, 224 insertions(+), 33 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 0d4abf0..fa7a3c6 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -1128,7 +1128,7 @@ public class InstructionUtils
                return linst;
        }
 
-       private static String removeFEDOutputFlag(String linst){
+       public static String removeFEDOutputFlag(String linst){
                return linst.substring(0, 
linst.lastIndexOf(Lop.OPERAND_DELIMITOR));
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFEDInstruction.java
index 9c74ea3..0c37a89 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFEDInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFEDInstruction.java
@@ -72,6 +72,8 @@ public abstract class BinaryFEDInstruction extends 
ComputationFEDInstruction {
                        throw new DMLRuntimeException("Federated binary tensor 
tensor operations not yet supported");
                else if( in1.isMatrix() && in2.isScalar() || in2.isMatrix() && 
in1.isScalar() )
                        return new BinaryMatrixScalarFEDInstruction(operator, 
in1, in2, out, opcode, str, fedOut);
+               else if( in1.isFrame() && in2.isScalar() || in2.isFrame() && 
in1.isScalar() )
+                       return new BinaryFrameScalarFEDInstruction(operator, 
in1, in2, out, opcode, InstructionUtils.removeFEDOutputFlag(str));
                else
                        throw new DMLRuntimeException("Federated binary 
operations not yet supported:" + opcode);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFrameScalarFEDInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFrameScalarFEDInstruction.java
new file mode 100644
index 0000000..ecb3a45
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/BinaryFrameScalarFEDInstruction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameScalarFEDInstruction extends BinaryFEDInstruction
+{
+       protected BinaryFrameScalarFEDInstruction(Operator op, CPOperand in1,
+                       CPOperand in2, CPOperand out, String opcode, String 
istr) {
+               super(FEDInstruction.FEDType.Binary, op, in1, in2, out, opcode, 
istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec)  {
+               // get input frames
+               FrameObject fo = ec.getFrameObject(input1);
+               FederationMap fedMap = fo.getFedMapping();
+
+               //compute results
+               FederatedRequest fr1 = 
FederationUtils.callInstruction(instString,
+                       output, new CPOperand[] {input1}, new long[] 
{fedMap.getID()});
+               fedMap.execute(getTID(), true, fr1);
+
+               FrameObject out = ec.getFrameObject(output);
+               out.setSchema(fo.getSchema());
+               out.getDataCharacteristics().set(fo.getDataCharacteristics());
+               out.setFedMapping(fedMap.copyWithNewID(fr1.getID()));
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
index 721143f..15409d9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
@@ -35,6 +35,7 @@ import 
org.apache.sysds.runtime.instructions.cp.AggregateBinaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.AggregateTernaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.BinaryCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.BinaryFrameScalarCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.CtableCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.Data;
 import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
@@ -156,6 +157,9 @@ public class FEDInstructionUtils {
                                else
                                        fedinst = 
BinaryFEDInstruction.parseInstruction(
                                                
InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
+                       } else if(inst.getOpcode().equals("_map") && inst 
instanceof BinaryFrameScalarCPInstruction && 
!inst.getInstructionString().contains("UtilFunctions")
+                               && instruction.input1.isFrame() && 
ec.getFrameObject(instruction.input1).isFederated()) {
+                               fedinst = 
BinaryFrameScalarFEDInstruction.parseInstruction(InstructionUtils.concatOperands(inst.getInstructionString(),FederatedOutput.NONE.name()));
                        }
                }
                else if( inst instanceof ParameterizedBuiltinCPInstruction ) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFrameMapTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFrameMapTest.java
new file mode 100644
index 0000000..06cb8c0
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedFrameMapTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedFrameMapTest extends AutomatedTestBase {
+
+       private final static String TEST_NAME1 = "FederatedFrameMapTest";
+
+       private final static String TEST_DIR = "functions/federated/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FederatedFrameMapTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+
+       @Parameterized.Parameter(2)
+       public boolean rowPartitioned;
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                       {100, 12, true},
+                       {100, 12, false},
+               });
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"S"}));
+       }
+
+       @Test
+       public void testLeftIndexFullDenseFrameCP() {
+               runAggregateOperationTest(ExecMode.SINGLE_NODE);
+       }
+
+       private void runAggregateOperationTest(ExecMode execMode) {
+               setExecMode(execMode);
+
+               String TEST_NAME = TEST_NAME1;
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int r1 = rows;
+               int c1 = cols / 4;
+               if(rowPartitioned) {
+                       r1 = rows / 4;
+                       c1 = cols;
+               }
+
+               double[][] X1 = getRandomMatrix(r1, c1, 1, 5, 1, 3);
+               double[][] X2 = getRandomMatrix(r1, c1, 1, 5, 1, 7);
+               double[][] X3 = getRandomMatrix(r1, c1,  1, 5, 1, 8);
+               double[][] X4 = getRandomMatrix(r1, c1, 1, 5, 1, 9);
+
+               MatrixCharacteristics mc = new MatrixCharacteristics(r1, c1,  
blocksize, r1 * c1);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+               Thread t4 = startLocalFedWorkerThread(port4);
+
+               rtplatform = execMode;
+               if(rtplatform == ExecMode.SPARK) {
+                       System.out.println(7);
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               }
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-explain", "-args", input("X1"), 
input("X2"), input("X3"), input("X4"),
+                       Boolean.toString(rowPartitioned).toUpperCase(), 
expected("S")};
+               runTest(null);
+               
+               // Run actual dml script with federated matrix
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")),
+                       "in_Y=" + input("Y"), "rows=" + rows, "cols=" + cols,
+                       "rP=" + Boolean.toString(rowPartitioned).toUpperCase(), 
"out_S=" + output("S")};
+               runTest(null);
+
+               // compare via files
+               compareResults(1e-9);
+
+               Assert.assertTrue(heavyHittersContainsString("fed__map"));
+
+               // check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+       }
+}
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml 
b/src/test/scripts/functions/federated/FederatedFrameMapTest.dml
similarity index 90%
copy from 
src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
copy to src/test/scripts/functions/federated/FederatedFrameMapTest.dml
index 5604989..b879b2f 100644
--- a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
+++ b/src/test/scripts/functions/federated/FederatedFrameMapTest.dml
@@ -19,11 +19,6 @@
 #
 #-------------------------------------------------------------
 
-from = $from;
-to = $to;
-from2 = $from2;
-to2 = $to2;
-
 if ($rP) {
   A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
     ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
@@ -34,12 +29,9 @@ if ($rP) {
     list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), 
list($rows, $cols)));
 }
 
-B = read($in_Y)
-
-B = as.frame(B)
 A = as.frame(A)
 
-A[from:to, from2:to2] = B;
-write(A, $out_S);
-
-print(toString(A))
\ No newline at end of file
+S = map(A, "x -> x.replace(\"1\", \"2\")");
+write(S, $out_S);
+print(toString(A[1,1]))
+print(toString(S[1,1]))
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml 
b/src/test/scripts/functions/federated/FederatedFrameMapTestReference.dml
similarity index 58%
copy from 
src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
copy to src/test/scripts/functions/federated/FederatedFrameMapTestReference.dml
index 5604989..8b55291 100644
--- a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
+++ b/src/test/scripts/functions/federated/FederatedFrameMapTestReference.dml
@@ -19,27 +19,17 @@
 #
 #-------------------------------------------------------------
 
-from = $from;
-to = $to;
-from2 = $from2;
-to2 = $to2;
-
-if ($rP) {
-  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
-    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
-    list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
-} else {
-  A = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
-    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, 
$cols/2),
-    list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), 
list($rows, $cols)));
+if($5) {
+  A = rbind(read($1), read($2), read($3), read($4));
+}
+else {
+  A = cbind(read($1), read($2), read($3), read($4));
 }
 
-B = read($in_Y)
-
-B = as.frame(B)
 A = as.frame(A)
 
-A[from:to, from2:to2] = B;
-write(A, $out_S);
+S = map(A, "x -> x.replace(\"1\", \"2\")");
+write(S, $6);
 
-print(toString(A))
\ No newline at end of file
+print(toString(A[1,1]))
+print(toString(S[1,1]))
diff --git 
a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml 
b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
index 5604989..ca9fe81 100644
--- a/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
+++ b/src/test/scripts/functions/federated/FederatedLeftIndexFrameFullTest.dml
@@ -42,4 +42,4 @@ A = as.frame(A)
 A[from:to, from2:to2] = B;
 write(A, $out_S);
 
-print(toString(A))
\ No newline at end of file
+print(toString(A))

Reply via email to