sebwrede commented on a change in pull request #1184:
URL: https://github.com/apache/systemds/pull/1184#discussion_r611678575



##########
File path: 
src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedCtableTest.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedCtableTest extends AutomatedTestBase {
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME1 = "FederatedCtableTest";
+       private final static String TEST_NAME2 = "FederatedCtableFedOutput";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedCtableTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+       @Parameterized.Parameter(2)
+       public int maxVal1;
+       @Parameterized.Parameter(3)
+       public int maxVal2;
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"F"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"F"}));
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                       {12, 4, 4, 7},
+//                     {100, 14, 4, 7}, {1000, 14, 4, 7}
+               });
+       }
+
+       @Test
+       public void federatedCtableSinglenode() {
+               runCtable(Types.ExecMode.SINGLE_NODE, false);
+       }
+
+       @Test
+       public void federatedCtableFedOutputSinglenode() {
+               runCtable(Types.ExecMode.SINGLE_NODE, true);
+       }

Review comment:
       I think we could use more test cases. 
   How about a test case where Y is federated and X is not? 
   What about a third input? (especially something that triggers the 
reversedWeights-case in the processInstruction method)
   It is also relevant with some invalid input to see if it actually throws an 
exception or if it computes meaningless output (for instance invalid 
dimensionality of the input). 
   I think it is also very relevant to test with different federated range 
overlaps, since the slicing is an important component of the CTable 
instruction. 

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/CtableFEDInstruction.java
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.functionobjects.And;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+
+public class CtableFEDInstruction extends ComputationFEDInstruction {
+       private final CPOperand _outDim1;
+       private final CPOperand _outDim2;
+       private final boolean _isExpand;
+       private final boolean _ignoreZeros;
+
+       private CtableFEDInstruction(CPOperand in1, CPOperand in2, CPOperand 
in3, CPOperand out, String outputDim1, boolean dim1Literal, String outputDim2, 
boolean dim2Literal, boolean isExpand,
+               boolean ignoreZeros, String opcode, String istr) {
+               super(FEDType.Ctable, null, in1, in2, in3, out, opcode, istr);
+               _outDim1 = new CPOperand(outputDim1, ValueType.FP64, 
DataType.SCALAR, dim1Literal);
+               _outDim2 = new CPOperand(outputDim2, ValueType.FP64, 
DataType.SCALAR, dim2Literal);
+               _isExpand = isExpand;
+               _ignoreZeros = ignoreZeros;
+       }
+
+       public static CtableFEDInstruction parseInstruction(String inst) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(inst);
+               InstructionUtils.checkNumFields(parts, 7);
+
+               String opcode = parts[0];
+
+               //handle opcode
+               if(!(opcode.equalsIgnoreCase("ctable"))) {
+                       throw new DMLRuntimeException("Unexpected opcode in 
CtableFEDInstruction: " + inst);
+               }
+
+               //handle operands
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand in3 = new CPOperand(parts[3]);
+
+               //handle known dimension information
+               String[] dim1Fields = 
parts[4].split(Instruction.LITERAL_PREFIX);
+               String[] dim2Fields = 
parts[5].split(Instruction.LITERAL_PREFIX);
+
+               CPOperand out = new CPOperand(parts[6]);
+               boolean ignoreZeros = Boolean.parseBoolean(parts[7]);
+
+               // ctable does not require any operator, so we simply pass-in a 
dummy operator with null functionobject
+               return new CtableFEDInstruction(in1,
+                       in2, in3, out, dim1Fields[0], 
Boolean.parseBoolean(dim1Fields[1]),
+                       dim2Fields[0], Boolean.parseBoolean(dim2Fields[1]), 
false, ignoreZeros, opcode, inst);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               MatrixObject mo1 = ec.getMatrixObject(input1);
+               MatrixObject mo2 = ec.getMatrixObject(input2);
+
+               boolean reversed = false;
+               if(!mo1.isFederated() && mo2.isFederated()) {
+                       mo1 = ec.getMatrixObject(input2);
+                       mo2 = ec.getMatrixObject(input1);
+                       reversed = true;
+               }
+
+               if(mo2.getNumColumns() != 1 && mo1.getNumColumns() != 1)
+                       throw new DMLRuntimeException("Federated ctable: Input 
vectors should be nx1.");
+
+               // get new output dims
+               Long[] dims1 = getOutputDimension(mo1, input1, _outDim1, 
mo1.getFedMapping().getFederatedRanges());
+               Long[] dims2 = getOutputDimension(mo2, input2, _outDim2, 
mo1.getFedMapping().getFederatedRanges());
+
+               MatrixObject mo3 = input3 != null && input3.isMatrix() ? 
ec.getMatrixObject(input3) : null;
+
+               boolean reversedWeights = mo3 != null && mo3.isFederated() && 
!(mo1.isFederated() || mo2.isFederated());
+               if(reversedWeights) {
+                       mo3 = mo1;
+                       mo1 = ec.getMatrixObject(input3);
+               }
+
+               long dim1 = Collections.max(Arrays.asList(dims1), 
Long::compare);
+               boolean fedOutput = dim1 % mo1.getFedMapping().getSize() == 0 
&& dims1.length == Arrays.stream(dims1).distinct().count();
+
+               processRequest(ec, mo1, mo2, mo3, reversed, reversedWeights, 
fedOutput, dims1, dims2);
+       }
+
+       private void processRequest(ExecutionContext ec, MatrixObject mo1, 
MatrixObject mo2, MatrixObject mo3,
+               boolean reversed, boolean reversedWeights, boolean fedOutput, 
Long[] dims1, Long[] dims2) {
+               Future<FederatedResponse>[] ffr;
+
+               FederatedRequest[] fr1 = 
mo1.getFedMapping().broadcastSliced(mo2, false);
+               FederatedRequest fr2, fr3;
+               if(mo3 == null) {
+                       if(!reversed)
+                               fr2 = 
FederationUtils.callInstruction(instString, output, new CPOperand[] {input1, 
input2}, new long[] {mo1.getFedMapping().getID(), fr1[0].getID()});
+                       else
+                               fr2 = 
FederationUtils.callInstruction(instString, output, new CPOperand[] {input1, 
input2}, new long[] {fr1[0].getID(), mo1.getFedMapping().getID()});
+
+                       fr3 = new 
FederatedRequest(FederatedRequest.RequestType.GET_VAR, fr2.getID());
+                       FederatedRequest fr4 = 
mo1.getFedMapping().cleanup(getTID(), fr1[0].getID());
+                       ffr = mo1.getFedMapping().execute(getTID(), true, fr1, 
fr2, fr3, fr4);
+
+               } else {
+                       FederatedRequest[] fr4 = 
mo1.getFedMapping().broadcastSliced(mo3, false);
+                       if(!reversed && !reversedWeights)
+                               fr2 = 
FederationUtils.callInstruction(instString, output, new CPOperand[] {input1, 
input2, input3}, new long[] {mo1.getFedMapping().getID(), fr1[0].getID(), 
fr4[0].getID()});
+                       else if(reversed && !reversedWeights)
+                               fr2 = 
FederationUtils.callInstruction(instString, output, new CPOperand[] {input1, 
input2, input3}, new long[] {fr1[0].getID(), mo1.getFedMapping().getID(), 
fr4[0].getID()});
+                       else
+                               fr2 = 
FederationUtils.callInstruction(instString, output, new CPOperand[] {input1, 
input2, input3}, new long[] {fr1[0].getID(), fr4[0].getID(), 
mo1.getFedMapping().getID()});

Review comment:
       Check line lengths. 

##########
File path: 
src/main/java/org/apache/sysds/runtime/instructions/fed/CtableFEDInstruction.java
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Future;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedResponse;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedUDF;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.functionobjects.And;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ScalarObject;
+import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+
+public class CtableFEDInstruction extends ComputationFEDInstruction {
+       private final CPOperand _outDim1;
+       private final CPOperand _outDim2;
+       private final boolean _isExpand;
+       private final boolean _ignoreZeros;
+
+       private CtableFEDInstruction(CPOperand in1, CPOperand in2, CPOperand 
in3, CPOperand out, String outputDim1, boolean dim1Literal, String outputDim2, 
boolean dim2Literal, boolean isExpand,
+               boolean ignoreZeros, String opcode, String istr) {
+               super(FEDType.Ctable, null, in1, in2, in3, out, opcode, istr);
+               _outDim1 = new CPOperand(outputDim1, ValueType.FP64, 
DataType.SCALAR, dim1Literal);
+               _outDim2 = new CPOperand(outputDim2, ValueType.FP64, 
DataType.SCALAR, dim2Literal);
+               _isExpand = isExpand;
+               _ignoreZeros = ignoreZeros;
+       }
+
+       public static CtableFEDInstruction parseInstruction(String inst) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(inst);
+               InstructionUtils.checkNumFields(parts, 7);
+
+               String opcode = parts[0];
+
+               //handle opcode
+               if(!(opcode.equalsIgnoreCase("ctable"))) {
+                       throw new DMLRuntimeException("Unexpected opcode in 
CtableFEDInstruction: " + inst);
+               }
+
+               //handle operands
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand in3 = new CPOperand(parts[3]);
+
+               //handle known dimension information
+               String[] dim1Fields = 
parts[4].split(Instruction.LITERAL_PREFIX);
+               String[] dim2Fields = 
parts[5].split(Instruction.LITERAL_PREFIX);
+
+               CPOperand out = new CPOperand(parts[6]);
+               boolean ignoreZeros = Boolean.parseBoolean(parts[7]);
+
+               // ctable does not require any operator, so we simply pass-in a 
dummy operator with null functionobject
+               return new CtableFEDInstruction(in1,
+                       in2, in3, out, dim1Fields[0], 
Boolean.parseBoolean(dim1Fields[1]),
+                       dim2Fields[0], Boolean.parseBoolean(dim2Fields[1]), 
false, ignoreZeros, opcode, inst);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               MatrixObject mo1 = ec.getMatrixObject(input1);
+               MatrixObject mo2 = ec.getMatrixObject(input2);
+
+               boolean reversed = false;
+               if(!mo1.isFederated() && mo2.isFederated()) {
+                       mo1 = ec.getMatrixObject(input2);
+                       mo2 = ec.getMatrixObject(input1);
+                       reversed = true;
+               }
+
+               if(mo2.getNumColumns() != 1 && mo1.getNumColumns() != 1)
+                       throw new DMLRuntimeException("Federated ctable: Input 
vectors should be nx1.");

Review comment:
       Minor comment: You could print the dimensions in the exception message 
so that the user immediately sees what the dimensions are. For instance: "Input 
vectors should be nx1, but are 100x5". 

##########
File path: src/test/scripts/functions/federated/FederatedCtableFedOutput.dml
##########
@@ -0,0 +1,38 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+        ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+
+m = nrow(X);
+n = ncol(X);
+
+# prepare offset vectors and one-hot encoded X
+fdom = colMaxs(X);
+
+foffb = t(cumsum(t(fdom))) - fdom;
+foffe = t(cumsum(t(fdom)))

Review comment:
       Out of curiosity: what are _fdom_, _foffb_, and _foffe_ abbreviations of?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to