This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f05aa9  [SYSTEMDS-2636] Built-in function for univariate statistics
3f05aa9 is described below

commit 3f05aa9f59f663df105307da3483d0de8f046c08
Author: Olga <[email protected]>
AuthorDate: Mon Aug 24 10:59:37 2020 +0200

    [SYSTEMDS-2636] Built-in function for univariate statistics
    
    Added univar and bivar builtins, federated tests, fixed fed matrix max
    
    Closes #1035.
---
 scripts/builtin/univar.dml                         |  94 +++++++++++
 .../java/org/apache/sysds/common/Builtins.java     |   1 +
 .../controlprogram/federated/FederationUtils.java  |  52 +++---
 .../functions/federated/FederatedBivarTest.java    | 176 +++++++++++++++++++++
 .../functions/federated/FederatedUnivarTest.java   | 146 +++++++++++++++++
 .../functions/federated/FederatedBivarTest.dml     |  31 ++++
 .../federated/FederatedBivarTestReference.dml      |  28 ++++
 .../functions/federated/FederatedUnivarTest.dml    |  31 ++++
 .../federated/FederatedUnivarTestReference.dml     |  26 +++
 9 files changed, 566 insertions(+), 19 deletions(-)

diff --git a/scripts/builtin/univar.dml b/scripts/builtin/univar.dml
new file mode 100644
index 0000000..80a4606
--- /dev/null
+++ b/scripts/builtin/univar.dml
@@ -0,0 +1,94 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+#
+# Computes univariate statistics for all attributes in a given data set
+#
+# INPUT PARAMETERS:
+# 
-------------------------------------------------------------------------------------------------
+# NAME           TYPE               DEFAULT  MEANING
+# 
-------------------------------------------------------------------------------------------------
+# X              Matrix[Double]     ---      Input matrix of the shape (N, D)
+# TYPES          Matrix[Integer]    ---      Matrix of the shape (1, D) with 
features types:
+#                                            1 for scale, 2 for nominal, 3 for 
ordinal
+# 
-------------------------------------------------------------------------------------------------
+# OUTPUT: Matrix of summary statistics
+
+m_univar = function(Matrix[Double] X, Matrix[Double] types)
+return(Matrix[Double] univarStats)
+{
+  max_kind = max(types);
+  N = nrow(X);
+  D = ncol(X);
+
+  # Number of statistics (14 scale, 3 categorical)
+  numBaseStats = 17;
+  univarStats = matrix(0, rows=numBaseStats, cols=D);
+
+  # Compute max domain size among all categorical attributes
+  maxDomain = as.integer(max((types > 1) * colMaxs(X)));
+
+  parfor(i in 1:D, check=0) {
+    F = X[,i];
+
+    type = as.scalar(types[1,i]);
+    minF = min(F);
+    maxF = max(F);
+
+    if (type == 1) {
+      # compute SCALE statistics on the projected column
+      rng = maxF - minF;
+
+      mu = mean(F);
+      m2 = moment(F, 2);
+      m3 = moment(F, 3);
+      m4 = moment(F, 4);
+
+      var = N/(N-1.0)*m2;
+      std_dev = sqrt(var);
+      se = std_dev/sqrt(N);
+      cv = std_dev/mu;
+
+      g1 = m3/(std_dev^3);
+      g2 = m4/(std_dev^4) - 3;
+      se_g1=sqrt( (6/(N-2.0)) * (N/(N+1.0)) * ((N-1.0)/(N+3.0)) );
+      se_g2=sqrt( (4/(N+5.0)) * ((N^2-1)/(N-3.0)) * se_g1^2 );
+
+      md = median(F);
+      iqm = interQuartileMean(F);
+
+      univarStats[1:14,i] = as.matrix(list(minF, maxF, rng,
+        mu, var, std_dev, se, cv, g1, g2, se_g1, se_g2, md, iqm));
+    }
+
+    if (type == 2 | type == 3) {
+      # check if the categorical column has valid values
+      if( minF <= 0 ) {
+        print("ERROR: Categorical attributes can only take values starting 
from 1. Encountered a value " + minF + " in attribute " + i);
+      }
+
+      # compute CATEGORICAL statistics on the projected column
+      cat_counts = table(F, 1, maxDomain, 1);
+      mode = as.scalar(rowIndexMax(t(cat_counts)));
+      numModes = sum(cat_counts == max(cat_counts));
+      univarStats[15:17,i] = as.matrix(list(maxF, mode, numModes));
+    }
+  }
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java 
b/src/main/java/org/apache/sysds/common/Builtins.java
index 0a05d15..9e14f50 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -191,6 +191,7 @@ public enum Builtins {
        COUNT_DISTINCT_APPROX("countDistinctApprox",false),
        VAR("var", false),
        XOR("xor", false),
+       UNIVAR("univar", true),
        WINSORIZE("winsorize", true, false), //TODO parameterize w/ prob, 
min/max val
 
        //parameterized builtin functions
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
index 7df7c51..bdec97f 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederationUtils.java
@@ -47,15 +47,15 @@ import 
org.apache.sysds.runtime.matrix.operators.SimpleOperator;
 public class FederationUtils {
        protected static Logger log = Logger.getLogger(FederationUtils.class);
        private static final IDSequence _idSeq = new IDSequence();
-       
+
        public static void resetFedDataID() {
                _idSeq.reset();
        }
-       
+
        public static long getNextFedDataID() {
                return _idSeq.getNextID();
        }
-       
+
        public static FederatedRequest callInstruction(String inst, CPOperand 
varOldOut, CPOperand[] varOldIn, long[] varNewIn) {
                //TODO better and safe replacement of operand names --> 
instruction utils
                long id = getNextFedDataID();
@@ -85,7 +85,7 @@ public class FederationUtils {
                        throw new DMLRuntimeException(ex);
                }
        }
-       
+
        public static MatrixBlock aggMean(Future<FederatedResponse>[] ffr, 
FederationMap map) {
                try {
                        FederatedRange[] ranges = map.getFederatedRanges();
@@ -108,7 +108,22 @@ public class FederationUtils {
                        throw new DMLRuntimeException(ex);
                }
        }
-       
+
+       public static DoubleObject aggMinMax(Future<FederatedResponse>[] ffr, 
boolean isMin, boolean isScalar) {
+               try {
+                       double res = isMin ? Double.MAX_VALUE : - 
Double.MAX_VALUE;
+                       for (Future<FederatedResponse> fr: ffr){
+                               double v = isScalar ? 
((ScalarObject)fr.get().getData()[0]).getDoubleValue() :
+                                       isMin ? ((MatrixBlock) 
fr.get().getData()[0]).min() : ((MatrixBlock) fr.get().getData()[0]).max();
+                               res = isMin ? Math.min(res, v) : Math.max(res, 
v);
+                       }
+                       return new DoubleObject(res);
+               }
+               catch (Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+
        public static MatrixBlock[] getResults(Future<FederatedResponse>[] ffr) 
{
                try {
                        MatrixBlock[] ret = new MatrixBlock[ffr.length];
@@ -136,25 +151,19 @@ public class FederationUtils {
 
        public static ScalarObject aggScalar(AggregateUnaryOperator aop, 
Future<FederatedResponse>[] ffr) {
                if(!(aop.aggOp.increOp.fn instanceof KahanFunction || 
(aop.aggOp.increOp.fn instanceof Builtin &&
-                       (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == 
BuiltinCode.MIN ||
-                               ((Builtin) 
aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
+                               (((Builtin) 
aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+                                               ((Builtin) 
aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)))) {
                        throw new DMLRuntimeException("Unsupported aggregation 
operator: "
-                               + aop.aggOp.increOp.getClass().getSimpleName());
+                                       + 
aop.aggOp.increOp.getClass().getSimpleName());
                }
 
                try {
                        if(aop.aggOp.increOp.fn instanceof Builtin){
                                // then we know it is a Min or Max based on the 
previous check.
                                boolean isMin = ((Builtin) 
aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
-                               double res = isMin ? Double.MAX_VALUE: - 
Double.MAX_VALUE;
-                               double v;
-                               for (Future<FederatedResponse> fr: ffr){
-                                       v = 
((ScalarObject)fr.get().getData()[0]).getDoubleValue();
-                                       res = isMin ? Math.min(res, v) : 
Math.max(res, v);
-                               }
-                               return new DoubleObject(res);
-                       } 
-                       else {          
+                               return aggMinMax(ffr, isMin, true);
+                       }
+                       else {
                                double sum = 0; //uak+
                                for( Future<FederatedResponse> fr : ffr )
                                        sum += 
((ScalarObject)fr.get().getData()[0]).getDoubleValue();
@@ -172,17 +181,22 @@ public class FederationUtils {
                        //independent of aggregation function for 
row-partitioned federated matrices
                        return rbind(ffr);
                }
-               
                // handle col aggregate
                if( aop.aggOp.increOp.fn instanceof KahanFunction )
                        return aggAdd(ffr);
                else if( aop.aggOp.increOp.fn instanceof Mean )
                        return aggMean(ffr, map);
+               else if (aop.aggOp.increOp.fn instanceof Builtin &&
+                       (((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == 
BuiltinCode.MIN ||
+                       ((Builtin) aop.aggOp.increOp.fn).getBuiltinCode() == 
BuiltinCode.MAX)) {
+                       boolean isMin = ((Builtin) 
aop.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN;
+                       return new MatrixBlock(1,1,aggMinMax(ffr, isMin, 
false).getDoubleValue());
+               }
                else
                        throw new DMLRuntimeException("Unsupported aggregation 
operator: "
                                + 
aop.aggOp.increOp.fn.getClass().getSimpleName());
        }
-       
+
        public static void waitFor(List<Future<FederatedResponse>> responses) {
                try {
                        for(Future<FederatedResponse> fr : responses)
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
new file mode 100644
index 0000000..906beec
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedBivarTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedBivarTest extends AutomatedTestBase {
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME = "FederatedBivarTest";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedUnivarTest.class.getSimpleName() + "/";
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                               {10000, 10},
+//                             {2000, 50}, {1000, 10},
+//                             {10000, 10}, {2000, 50}, {1000, 100}
+               });
+       }
+
+       @Test
+       @Ignore
+       public void federatedBivarSinglenode() {
+               federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       @Ignore
+       public void federatedBivarHybrid() {
+               federatedL2SVM(Types.ExecMode.HYBRID);
+       }
+
+       public void federatedL2SVM(Types.ExecMode execMode) {
+               Types.ExecMode platformOld = setExecMode(execMode);
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int quarterRows = rows / 4;
+               // We have two matrices handled by a single federated worker
+               double[][] X1 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 3);
+               double[][] X2 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 7);
+               double[][] X3 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 8);
+               double[][] X4 = getRandomMatrix(quarterRows, cols, 1, 5, 1, 9);
+
+               // write types matrix shape of (1, D)
+               double [][] T1 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+               Arrays.stream(T1[0]).forEach(n -> Math.ceil(n));
+
+               double [][] T2 = getRandomMatrix(1, cols, 0, 2, 1, 9);
+               Arrays.stream(T2[0]).forEach(n -> Math.ceil(n));
+
+               double [][] S1 = getRandomMatrix(1, (int) cols/5, 1, cols, 1, 
3);
+               Arrays.stream(S1[0]).forEach(n -> Math.ceil(n));
+
+               double [][] S2 = getRandomMatrix(1, (int) cols/4, 1, cols, 1, 
9);
+               Arrays.stream(S2[0]).forEach(n -> Math.ceil(n));
+
+               MatrixCharacteristics mc= new 
MatrixCharacteristics(quarterRows, cols, blocksize, quarterRows * cols);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+               writeInputMatrixWithMTD("S1", S1, false);
+               writeInputMatrixWithMTD("S2", S2, false);
+               writeInputMatrixWithMTD("T1", T1, false);
+               writeInputMatrixWithMTD("T2", T2, false);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorker(port1);
+               Thread t2 = startLocalFedWorker(port2);
+               Thread t3 = startLocalFedWorker(port3);
+               Thread t4 = startLocalFedWorker(port4);
+
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+               setOutputBuffering(false);
+
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-stats", "-args", input("X1"), 
input("X2"), input("X3"), input("X4"), input("S1"), input("S2"), input("T1"), 
input("T2"), expected("B")};
+               runTest(true, false, null, -1);
+
+               // Run actual dml script with federated matrix
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")),
+                       "in_S1=" + input("S1"),
+                       "in_S2=" + input("S2"),
+                       "in_T1=" + input("T1"),
+                       "in_T2=" + input("T2"),
+                       "rows=" + rows, "cols=" + cols,
+                       "out=" + output("B")};
+               runTest(true, false, null, -1);
+
+               // compare via files
+//             compareResults(1e-9);
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+               // check for federated operations
+//             Assert.assertTrue(heavyHittersContainsString("fed_ba+*"));
+//             Assert.assertTrue(heavyHittersContainsString("fed_uack+"));
+//             Assert.assertTrue(heavyHittersContainsString("fed_tsmm"));
+//             if( scaleAndShift ) {
+//                     
Assert.assertTrue(heavyHittersContainsString("fed_uacsqk+"));
+//                     
Assert.assertTrue(heavyHittersContainsString("fed_uacmean"));
+//                     Assert.assertTrue(heavyHittersContainsString("fed_-"));
+//                     Assert.assertTrue(heavyHittersContainsString("fed_/"));
+//                     
Assert.assertTrue(heavyHittersContainsString("fed_replace"));
+//             }
+
+               //check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("S2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("T2")));
+
+               resetExecMode(platformOld);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
new file mode 100644
index 0000000..a704d3a
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedUnivarTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
[email protected]
+public class FederatedUnivarTest extends AutomatedTestBase {
+       private final static String TEST_DIR = "functions/federated/";
+       private final static String TEST_NAME = "FederatedUnivarTest";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedUnivarTest.class.getSimpleName() + "/";
+
+       private final static int blocksize = 1024;
+       @Parameterized.Parameter()
+       public int rows;
+       @Parameterized.Parameter(1)
+       public int cols;
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                               {10000, 16},
+                               {2000, 32}, {1000, 64}, {10000, 128}
+               });
+       }
+
+       @Test
+       public void federatedUnivarSinglenode() {
+               federatedL2SVM(Types.ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void federatedUnivarHybrid() {
+               federatedL2SVM(Types.ExecMode.HYBRID);
+       }
+
+       public void federatedL2SVM(Types.ExecMode execMode) {
+               Types.ExecMode platformOld = setExecMode(execMode);
+
+               getAndLoadTestConfiguration(TEST_NAME);
+               String HOME = SCRIPT_DIR + TEST_DIR;
+
+               // write input matrices
+               int quarterCols = cols / 4;
+               // We have two matrices handled by a single federated worker
+               double[][] X1 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 3);
+               double[][] X2 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 7);
+               double[][] X3 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 8);
+               double[][] X4 = getRandomMatrix(rows, quarterCols, 1, 5, 1, 9);
+
+               // write types matrix shape of (1, D)
+               double [][] Y = getRandomMatrix(1, cols, 0, 3, 1, 9);
+               Arrays.stream(Y[0]).forEach(Math::ceil);
+
+               MatrixCharacteristics mc= new MatrixCharacteristics(rows, 
quarterCols, blocksize, rows * quarterCols);
+               writeInputMatrixWithMTD("X1", X1, false, mc);
+               writeInputMatrixWithMTD("X2", X2, false, mc);
+               writeInputMatrixWithMTD("X3", X3, false, mc);
+               writeInputMatrixWithMTD("X4", X4, false, mc);
+               writeInputMatrixWithMTD("Y", Y, false);
+
+               // empty script name because we don't execute any script, just 
start the worker
+               fullDMLScriptName = "";
+               int port1 = getRandomAvailablePort();
+               int port2 = getRandomAvailablePort();
+               int port3 = getRandomAvailablePort();
+               int port4 = getRandomAvailablePort();
+               Thread t1 = startLocalFedWorker(port1);
+               Thread t2 = startLocalFedWorker(port2);
+               Thread t3 = startLocalFedWorker(port3);
+               Thread t4 = startLocalFedWorker(port4);
+
+               TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
+               loadTestConfiguration(config);
+               setOutputBuffering(false);
+
+               // Run reference dml script with normal matrix
+               fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+               programArgs = new String[] {"-stats", "100", "-args", 
input("X1"), input("X2"), input("X3"), input("X4"), input("Y"), expected("B")};
+               runTest(true, false, null, -1);
+
+               // Run actual dml script with federated matrix
+               fullDMLScriptName = HOME + TEST_NAME + ".dml";
+               programArgs = new String[] {"-stats",  "100", "-nvargs",
+                       "in_X1=" + TestUtils.federatedAddress(port1, 
input("X1")),
+                       "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                       "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                       "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")),
+                       "in_Y=" + input("Y"), // types
+                       "rows=" + rows, "cols=" + cols,
+                       "out=" + output("B")};
+               runTest(true, false, null, -1);
+
+               // compare via files
+               compareResults(1e-9);
+               TestUtils.shutdownThreads(t1, t2, t3, t4);
+
+               // check for federated operations
+               Assert.assertTrue(heavyHittersContainsString("fed_uacmax"));
+
+               //check that federated input files are still existing
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X3")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X4")));
+               Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("Y")));
+
+               resetExecMode(platformOld);
+       }
+}
diff --git a/src/test/scripts/functions/federated/FederatedBivarTest.dml 
b/src/test/scripts/functions/federated/FederatedBivarTest.dml
new file mode 100644
index 0000000..94b197a
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+               list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+S1 = read($in_S1);
+S2 = read($in_S2);
+T1 = read($in_T1);
+T2 = read($in_T2);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $out);
+
diff --git 
a/src/test/scripts/functions/federated/FederatedBivarTestReference.dml 
b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
new file mode 100644
index 0000000..b28347c
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedBivarTestReference.dml
@@ -0,0 +1,28 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rbind(read($1), read($2), read($3), read($4));
+S1 = read($5);
+S2 = read($6);
+T1 = read($7);
+T2 = read($8);
+B = bivar(X=X, S1=S1, S2=S2, T1=T1, T2=T2);
+write(B, $9);
\ No newline at end of file
diff --git a/src/test/scripts/functions/federated/FederatedUnivarTest.dml 
b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
new file mode 100644
index 0000000..443c0c1
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTest.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+#    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+#              list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)));
+
+X = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows, $cols/4), list(0,$cols/4), list($rows, 
$cols/2), list(0,$cols/2), list($rows, 3*($cols/4)), list(0, 3*($cols/4)), 
list($rows, $cols)));
+Y = read($in_Y);
+B = univar(X=X, types=Y);
+write(B, $out);
+
diff --git 
a/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml 
b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
new file mode 100644
index 0000000..46072e7
--- /dev/null
+++ b/src/test/scripts/functions/federated/FederatedUnivarTestReference.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#X = rbind(read($1), read($2), read($3), read($4));
+X = cbind(read($1), read($2), read($3), read($4));
+types = read($5);
+B = univar(X=X, types=types);
+write(B, $6);

Reply via email to