This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new f9e60f2  [MINOR] LM pipeline test with 4 workers
f9e60f2 is described below

commit f9e60f2cbd9bbced6f5cccf9d4ad960f9b18ea70
Author: Olga <[email protected]>
AuthorDate: Fri Nov 20 14:55:34 2020 +0100

    [MINOR] LM pipeline test with 4 workers
    
    Closes #1111
---
 .../federated/algorithms/FederatedLmPipeline.java  | 66 ++++++++++++++--------
 .../functions/federated/FederatedLmPipeline.dml    | 14 ++---
 ...ipeline.dml => FederatedLmPipeline4Workers.dml} | 20 ++++---
 ...ml => FederatedLmPipeline4WorkersReference.dml} | 23 ++++----
 .../federated/FederatedLmPipelineReference.dml     | 21 +++----
 5 files changed, 85 insertions(+), 59 deletions(-)

diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
index a5a3d02..8d4ac8d 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.test.functions.federated.algorithms;
 
-import org.junit.Test;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -29,13 +28,14 @@ import 
org.apache.sysds.runtime.transform.encode.EncoderRecode;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-
+import org.junit.Test;
 
 @net.jcip.annotations.NotThreadSafe
 public class FederatedLmPipeline extends AutomatedTestBase {
 
        private final static String TEST_DIR = "functions/federated/";
-       private final static String TEST_NAME = "FederatedLmPipeline";
+       private final static String TEST_NAME1 = "FederatedLmPipeline";
+       private final static String TEST_NAME2 = "FederatedLmPipeline4Workers";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
FederatedLmPipeline.class.getSimpleName() + "/";
 
        public int rows = 10000;
@@ -44,24 +44,35 @@ public class FederatedLmPipeline extends AutomatedTestBase {
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
-               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"Z"}));
+               addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"Z"}));
        }
 
        @Test
        public void federatedLmPipelineContinguous() {
-               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true);
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true, 
TEST_NAME1);
        }
-       
+
+       @Test
+       public void federatedLmPipelineContinguous4Workers() {
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true, 
TEST_NAME2);
+       }
+
        @Test
        public void federatedLmPipelineSampled() {
-               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false);
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false, 
TEST_NAME1);
        }
 
-       public void federatedLmPipeline(ExecMode execMode, boolean contSplits) {
+       @Test
+       public void federatedLmPipelineSampled4Workers() {
+               federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false, 
TEST_NAME2);
+       }
+
+       public void federatedLmPipeline(ExecMode execMode, boolean contSplits, 
String TEST_NAME) {
                ExecMode oldExec = setExecMode(execMode);
                boolean oldSort = EncoderRecode.SORT_RECODE_MAP;
                EncoderRecode.SORT_RECODE_MAP = true;
-               
+
                getAndLoadTestConfiguration(TEST_NAME);
                String HOME = SCRIPT_DIR + TEST_DIR;
 
@@ -74,39 +85,50 @@ public class FederatedLmPipeline extends AutomatedTestBase {
                        MatrixBlock c = MatrixBlock.randOperations(rows, 1, 
1.0, 1, 50, "uniform", 23);
                        MatrixBlock rc = 
c.unaryOperations(InstructionUtils.parseUnaryOperator("round"), new 
MatrixBlock());
                        X = rc.append(X, new MatrixBlock(), true);
-                       
+
                        // We have two matrices handled by a single federated 
worker
-                       int halfRows = rows / 2;
-                       writeInputMatrixWithMTD("X1", X.slice(0, halfRows-1), 
false);
-                       writeInputMatrixWithMTD("X2", X.slice(halfRows, 
rows-1), false);
+                       int quarterRows = TEST_NAME.equals(TEST_NAME2) ? rows / 
4 : rows / 2;
+                       int[] k = TEST_NAME.equals(TEST_NAME2) ? new int[] 
{quarterRows - 1, quarterRows, 2 * quarterRows - 1,
+                               2 * quarterRows, 3 * quarterRows - 1, 3 * 
quarterRows,
+                               rows - 1} : new int[] {quarterRows - 1, 
quarterRows, rows - 1, 0, 0, 0, 0};
+                       writeInputMatrixWithMTD("X1", X.slice(0, k[0]), false);
+                       writeInputMatrixWithMTD("X2", X.slice(k[1], k[2]), 
false);
+                       writeInputMatrixWithMTD("X3", X.slice(k[3], k[4]), 
false);
+                       writeInputMatrixWithMTD("X4", X.slice(k[5], k[6]), 
false);
                        writeInputMatrixWithMTD("Y", y, false);
-                       
+
                        // empty script name because we don't execute any 
script, just start the worker
                        fullDMLScriptName = "";
                        int port1 = getRandomAvailablePort();
                        int port2 = getRandomAvailablePort();
+                       int port3 = getRandomAvailablePort();
+                       int port4 = getRandomAvailablePort();
                        Thread t1 = startLocalFedWorkerThread(port1, 
FED_WORKER_WAIT_S);
-                       Thread t2 = startLocalFedWorkerThread(port2);
-       
+                       Thread t2 = startLocalFedWorkerThread(port2, 
FED_WORKER_WAIT_S);
+                       Thread t3 = startLocalFedWorkerThread(port3, 
FED_WORKER_WAIT_S);
+                       Thread t4 = startLocalFedWorkerThread(port4);
+
                        TestConfiguration config = 
availableTestConfigurations.get(TEST_NAME);
                        loadTestConfiguration(config);
-       
+
                        // Run reference dml script with normal matrix
                        fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
-                       programArgs = new String[] {"-args", input("X1"), 
input("X2"), input("Y"),
+                       programArgs = new String[] {"-args", input("X1"), 
input("X2"), input("X3"), input("X4"), input("Y"),
                                String.valueOf(contSplits).toUpperCase(), 
expected("Z")};
                        runTest(true, false, null, -1);
-       
+
                        // Run actual dml script with federated matrix
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
                        programArgs = new String[] {"-nvargs", "in_X1=" + 
TestUtils.federatedAddress(port1, input("X1")),
-                               "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")), "rows=" + rows, "cols=" + (cols+1),
+                               "in_X2=" + TestUtils.federatedAddress(port2, 
input("X2")),
+                               "in_X3=" + TestUtils.federatedAddress(port3, 
input("X3")),
+                               "in_X4=" + TestUtils.federatedAddress(port4, 
input("X4")), "rows=" + rows, "cols=" + (cols + 1),
                                "in_Y=" + input("Y"), "cont=" + 
String.valueOf(contSplits).toUpperCase(), "out=" + output("Z")};
                        runTest(true, false, null, -1);
-       
+
                        // compare via files
                        compareResults(1e-2);
-                       TestUtils.shutdownThreads(t1, t2);
+                       TestUtils.shutdownThreads(t1, t2, t3, t4);
                }
                finally {
                        resetExecMode(oldExec);
diff --git a/src/test/scripts/functions/federated/FederatedLmPipeline.dml 
b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
index 323333d..a0862fa 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
@@ -35,7 +35,7 @@ colMean = (colMeans(X))
 upperBound = colMean + 1.5 * colSD
 lowerBound = colMean - 1.5 * colSD
 outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X); 
+X = X - outFilter*X + outFilter*colMeans(X);
 
 # normalization
 X = scale(X=X, center=TRUE, scale=TRUE);
@@ -43,7 +43,7 @@ X = scale(X=X, center=TRUE, scale=TRUE);
 # split training and testing
 [Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$cont, seed=7)
 
-# train regression model 
+# train regression model
 B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
 
 # model evaluation on test split
@@ -55,11 +55,11 @@ ss_res = sum(y_residual^2);
 ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
 R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
 print("\nAccuracy:" +
-      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(ytest) = " + sum(ytest) +
       "\n--sum(yhat) = " + sum(yhat) +
-      "\n--AVG_RES_Y: " + avg_res + 
-      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
-      "\n--R2: " + R2 );  
- 
+      "\n--AVG_RES_Y: " + avg_res +
+      "\n--SS_AVG_RES_Y: " + ss_avg_res +
+      "\n--R2: " + R2 );
+
 # write trained model and meta data
 write(B, $out)
diff --git a/src/test/scripts/functions/federated/FederatedLmPipeline.dml 
b/src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
similarity index 80%
copy from src/test/scripts/functions/federated/FederatedLmPipeline.dml
copy to src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
index 323333d..ebd96f7 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
@@ -19,8 +19,10 @@
 #
 #-------------------------------------------------------------
 
-Fin = federated(addresses=list($in_X1, $in_X2),
-    ranges=list(list(0, 0), list($rows / 2, $cols), list($rows / 2, 0), 
list($rows, $cols)))
+Fin = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+    ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0), 
list(2*$rows/4, $cols),
+        list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0), 
list($rows, $cols)))
+
 y = read($in_Y)
 
 # one hot encoding categorical, other passthrough
@@ -35,7 +37,7 @@ colMean = (colMeans(X))
 upperBound = colMean + 1.5 * colSD
 lowerBound = colMean - 1.5 * colSD
 outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X); 
+X = X - outFilter*X + outFilter*colMeans(X);
 
 # normalization
 X = scale(X=X, center=TRUE, scale=TRUE);
@@ -43,7 +45,7 @@ X = scale(X=X, center=TRUE, scale=TRUE);
 # split training and testing
 [Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$cont, seed=7)
 
-# train regression model 
+# train regression model
 B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
 
 # model evaluation on test split
@@ -55,11 +57,11 @@ ss_res = sum(y_residual^2);
 ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
 R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
 print("\nAccuracy:" +
-      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(ytest) = " + sum(ytest) +
       "\n--sum(yhat) = " + sum(yhat) +
-      "\n--AVG_RES_Y: " + avg_res + 
-      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
-      "\n--R2: " + R2 );  
- 
+      "\n--AVG_RES_Y: " + avg_res +
+      "\n--SS_AVG_RES_Y: " + ss_avg_res +
+      "\n--R2: " + R2 );
+
 # write trained model and meta data
 write(B, $out)
diff --git 
a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml 
b/src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
similarity index 83%
copy from src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
copy to 
src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
index 72ca292..7888c0a 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
+++ 
b/src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
@@ -19,8 +19,9 @@
 #
 #-------------------------------------------------------------
 
-Fin = rbind(read($1), read($2))
-y = read($3)
+Fin = rbind(read($1), read($2), read($3), read($4))
+
+y = read($5)
 
 # one hot encoding categorical, other passthrough
 Fall = as.frame(Fin)
@@ -34,15 +35,15 @@ colMean = (colMeans(X))
 upperBound = colMean + 1.5 * colSD
 lowerBound = colMean - 1.5 * colSD
 outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X); 
+X = X - outFilter*X + outFilter*colMeans(X);
 
 # normalization
 X = scale(X=X, center=TRUE, scale=TRUE);
 
 # split training and testing
-[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$4, seed=7)
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$6, seed=7)
 
-# train regression model 
+# train regression model
 B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
 
 # model evaluation on test split
@@ -54,11 +55,11 @@ ss_res = sum(y_residual^2);
 ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
 R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
 print("\nAccuracy:" +
-      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(ytest) = " + sum(ytest) +
       "\n--sum(yhat) = " + sum(yhat) +
-      "\n--AVG_RES_Y: " + avg_res + 
-      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
-      "\n--R2: " + R2 );  
- 
+      "\n--AVG_RES_Y: " + avg_res +
+      "\n--SS_AVG_RES_Y: " + ss_avg_res +
+      "\n--R2: " + R2 );
+
 # write trained model and meta data
-write(B, $5)
+write(B, $7)
diff --git 
a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml 
b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
index 72ca292..1934aae 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
@@ -20,7 +20,8 @@
 #-------------------------------------------------------------
 
 Fin = rbind(read($1), read($2))
-y = read($3)
+
+y = read($5)
 
 # one hot encoding categorical, other passthrough
 Fall = as.frame(Fin)
@@ -34,15 +35,15 @@ colMean = (colMeans(X))
 upperBound = colMean + 1.5 * colSD
 lowerBound = colMean - 1.5 * colSD
 outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X); 
+X = X - outFilter*X + outFilter*colMeans(X);
 
 # normalization
 X = scale(X=X, center=TRUE, scale=TRUE);
 
 # split training and testing
-[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$4, seed=7)
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$6, seed=7)
 
-# train regression model 
+# train regression model
 B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
 
 # model evaluation on test split
@@ -54,11 +55,11 @@ ss_res = sum(y_residual^2);
 ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
 R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
 print("\nAccuracy:" +
-      "\n--sum(ytest) = " + sum(ytest) + 
+      "\n--sum(ytest) = " + sum(ytest) +
       "\n--sum(yhat) = " + sum(yhat) +
-      "\n--AVG_RES_Y: " + avg_res + 
-      "\n--SS_AVG_RES_Y: " + ss_avg_res + 
-      "\n--R2: " + R2 );  
- 
+      "\n--AVG_RES_Y: " + avg_res +
+      "\n--SS_AVG_RES_Y: " + ss_avg_res +
+      "\n--R2: " + R2 );
+
 # write trained model and meta data
-write(B, $5)
+write(B, $7)

Reply via email to