This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new f9e60f2 [MINOR] LM pipeline test with 4 workers
f9e60f2 is described below
commit f9e60f2cbd9bbced6f5cccf9d4ad960f9b18ea70
Author: Olga <[email protected]>
AuthorDate: Fri Nov 20 14:55:34 2020 +0100
[MINOR] LM pipeline test with 4 workers
Closes #1111
---
.../federated/algorithms/FederatedLmPipeline.java | 66 ++++++++++++++--------
.../functions/federated/FederatedLmPipeline.dml | 14 ++---
...ipeline.dml => FederatedLmPipeline4Workers.dml} | 20 ++++---
...ml => FederatedLmPipeline4WorkersReference.dml} | 23 ++++----
.../federated/FederatedLmPipelineReference.dml | 21 +++----
5 files changed, 85 insertions(+), 59 deletions(-)
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
index a5a3d02..8d4ac8d 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
@@ -19,7 +19,6 @@
package org.apache.sysds.test.functions.federated.algorithms;
-import org.junit.Test;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -29,13 +28,14 @@ import
org.apache.sysds.runtime.transform.encode.EncoderRecode;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
-
+import org.junit.Test;
@net.jcip.annotations.NotThreadSafe
public class FederatedLmPipeline extends AutomatedTestBase {
private final static String TEST_DIR = "functions/federated/";
- private final static String TEST_NAME = "FederatedLmPipeline";
+ private final static String TEST_NAME1 = "FederatedLmPipeline";
+ private final static String TEST_NAME2 = "FederatedLmPipeline4Workers";
private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedLmPipeline.class.getSimpleName() + "/";
public int rows = 10000;
@@ -44,24 +44,35 @@ public class FederatedLmPipeline extends AutomatedTestBase {
@Override
public void setUp() {
TestUtils.clearAssertionInformation();
- addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Z"}));
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"Z"}));
+ addTestConfiguration(TEST_NAME2, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"Z"}));
}
@Test
public void federatedLmPipelineContinguous() {
- federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true);
+ federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true,
TEST_NAME1);
}
-
+
+ @Test
+ public void federatedLmPipelineContinguous4Workers() {
+ federatedLmPipeline(Types.ExecMode.SINGLE_NODE, true,
TEST_NAME2);
+ }
+
@Test
public void federatedLmPipelineSampled() {
- federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false);
+ federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false,
TEST_NAME1);
}
- public void federatedLmPipeline(ExecMode execMode, boolean contSplits) {
+ @Test
+ public void federatedLmPipelineSampled4Workers() {
+ federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false,
TEST_NAME2);
+ }
+
+ public void federatedLmPipeline(ExecMode execMode, boolean contSplits,
String TEST_NAME) {
ExecMode oldExec = setExecMode(execMode);
boolean oldSort = EncoderRecode.SORT_RECODE_MAP;
EncoderRecode.SORT_RECODE_MAP = true;
-
+
getAndLoadTestConfiguration(TEST_NAME);
String HOME = SCRIPT_DIR + TEST_DIR;
@@ -74,39 +85,50 @@ public class FederatedLmPipeline extends AutomatedTestBase {
MatrixBlock c = MatrixBlock.randOperations(rows, 1,
1.0, 1, 50, "uniform", 23);
MatrixBlock rc =
c.unaryOperations(InstructionUtils.parseUnaryOperator("round"), new
MatrixBlock());
X = rc.append(X, new MatrixBlock(), true);
-
+
// We have two matrices handled by a single federated
worker
- int halfRows = rows / 2;
- writeInputMatrixWithMTD("X1", X.slice(0, halfRows-1),
false);
- writeInputMatrixWithMTD("X2", X.slice(halfRows,
rows-1), false);
+ int quarterRows = TEST_NAME.equals(TEST_NAME2) ? rows /
4 : rows / 2;
+ int[] k = TEST_NAME.equals(TEST_NAME2) ? new int[]
{quarterRows - 1, quarterRows, 2 * quarterRows - 1,
+ 2 * quarterRows, 3 * quarterRows - 1, 3 *
quarterRows,
+ rows - 1} : new int[] {quarterRows - 1,
quarterRows, rows - 1, 0, 0, 0, 0};
+ writeInputMatrixWithMTD("X1", X.slice(0, k[0]), false);
+ writeInputMatrixWithMTD("X2", X.slice(k[1], k[2]),
false);
+ writeInputMatrixWithMTD("X3", X.slice(k[3], k[4]),
false);
+ writeInputMatrixWithMTD("X4", X.slice(k[5], k[6]),
false);
writeInputMatrixWithMTD("Y", y, false);
-
+
// empty script name because we don't execute any
script, just start the worker
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
+ int port3 = getRandomAvailablePort();
+ int port4 = getRandomAvailablePort();
Thread t1 = startLocalFedWorkerThread(port1,
FED_WORKER_WAIT_S);
- Thread t2 = startLocalFedWorkerThread(port2);
-
+ Thread t2 = startLocalFedWorkerThread(port2,
FED_WORKER_WAIT_S);
+ Thread t3 = startLocalFedWorkerThread(port3,
FED_WORKER_WAIT_S);
+ Thread t4 = startLocalFedWorkerThread(port4);
+
TestConfiguration config =
availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
-
+
// Run reference dml script with normal matrix
fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
- programArgs = new String[] {"-args", input("X1"),
input("X2"), input("Y"),
+ programArgs = new String[] {"-args", input("X1"),
input("X2"), input("X3"), input("X4"), input("Y"),
String.valueOf(contSplits).toUpperCase(),
expected("Z")};
runTest(true, false, null, -1);
-
+
// Run actual dml script with federated matrix
fullDMLScriptName = HOME + TEST_NAME + ".dml";
programArgs = new String[] {"-nvargs", "in_X1=" +
TestUtils.federatedAddress(port1, input("X1")),
- "in_X2=" + TestUtils.federatedAddress(port2,
input("X2")), "rows=" + rows, "cols=" + (cols+1),
+ "in_X2=" + TestUtils.federatedAddress(port2,
input("X2")),
+ "in_X3=" + TestUtils.federatedAddress(port3,
input("X3")),
+ "in_X4=" + TestUtils.federatedAddress(port4,
input("X4")), "rows=" + rows, "cols=" + (cols + 1),
"in_Y=" + input("Y"), "cont=" +
String.valueOf(contSplits).toUpperCase(), "out=" + output("Z")};
runTest(true, false, null, -1);
-
+
// compare via files
compareResults(1e-2);
- TestUtils.shutdownThreads(t1, t2);
+ TestUtils.shutdownThreads(t1, t2, t3, t4);
}
finally {
resetExecMode(oldExec);
diff --git a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
index 323333d..a0862fa 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipeline.dml
@@ -35,7 +35,7 @@ colMean = (colMeans(X))
upperBound = colMean + 1.5 * colSD
lowerBound = colMean - 1.5 * colSD
outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X);
+X = X - outFilter*X + outFilter*colMeans(X);
# normalization
X = scale(X=X, center=TRUE, scale=TRUE);
@@ -43,7 +43,7 @@ X = scale(X=X, center=TRUE, scale=TRUE);
# split training and testing
[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$cont, seed=7)
-# train regression model
+# train regression model
B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
# model evaluation on test split
@@ -55,11 +55,11 @@ ss_res = sum(y_residual^2);
ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
print("\nAccuracy:" +
- "\n--sum(ytest) = " + sum(ytest) +
+ "\n--sum(ytest) = " + sum(ytest) +
"\n--sum(yhat) = " + sum(yhat) +
- "\n--AVG_RES_Y: " + avg_res +
- "\n--SS_AVG_RES_Y: " + ss_avg_res +
- "\n--R2: " + R2 );
-
+ "\n--AVG_RES_Y: " + avg_res +
+ "\n--SS_AVG_RES_Y: " + ss_avg_res +
+ "\n--R2: " + R2 );
+
# write trained model and meta data
write(B, $out)
diff --git a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
b/src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
similarity index 80%
copy from src/test/scripts/functions/federated/FederatedLmPipeline.dml
copy to src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
index 323333d..ebd96f7 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipeline.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipeline4Workers.dml
@@ -19,8 +19,10 @@
#
#-------------------------------------------------------------
-Fin = federated(addresses=list($in_X1, $in_X2),
- ranges=list(list(0, 0), list($rows / 2, $cols), list($rows / 2, 0),
list($rows, $cols)))
+Fin = federated(addresses=list($in_X1, $in_X2, $in_X3, $in_X4),
+ ranges=list(list(0, 0), list($rows/4, $cols), list($rows/4, 0),
list(2*$rows/4, $cols),
+ list(2*$rows/4, 0), list(3*$rows/4, $cols), list(3*$rows/4, 0),
list($rows, $cols)))
+
y = read($in_Y)
# one hot encoding categorical, other passthrough
@@ -35,7 +37,7 @@ colMean = (colMeans(X))
upperBound = colMean + 1.5 * colSD
lowerBound = colMean - 1.5 * colSD
outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X);
+X = X - outFilter*X + outFilter*colMeans(X);
# normalization
X = scale(X=X, center=TRUE, scale=TRUE);
@@ -43,7 +45,7 @@ X = scale(X=X, center=TRUE, scale=TRUE);
# split training and testing
[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$cont, seed=7)
-# train regression model
+# train regression model
B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
# model evaluation on test split
@@ -55,11 +57,11 @@ ss_res = sum(y_residual^2);
ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
print("\nAccuracy:" +
- "\n--sum(ytest) = " + sum(ytest) +
+ "\n--sum(ytest) = " + sum(ytest) +
"\n--sum(yhat) = " + sum(yhat) +
- "\n--AVG_RES_Y: " + avg_res +
- "\n--SS_AVG_RES_Y: " + ss_avg_res +
- "\n--R2: " + R2 );
-
+ "\n--AVG_RES_Y: " + avg_res +
+ "\n--SS_AVG_RES_Y: " + ss_avg_res +
+ "\n--R2: " + R2 );
+
# write trained model and meta data
write(B, $out)
diff --git
a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
b/src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
similarity index 83%
copy from src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
copy to
src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
index 72ca292..7888c0a 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
+++
b/src/test/scripts/functions/federated/FederatedLmPipeline4WorkersReference.dml
@@ -19,8 +19,9 @@
#
#-------------------------------------------------------------
-Fin = rbind(read($1), read($2))
-y = read($3)
+Fin = rbind(read($1), read($2), read($3), read($4))
+
+y = read($5)
# one hot encoding categorical, other passthrough
Fall = as.frame(Fin)
@@ -34,15 +35,15 @@ colMean = (colMeans(X))
upperBound = colMean + 1.5 * colSD
lowerBound = colMean - 1.5 * colSD
outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X);
+X = X - outFilter*X + outFilter*colMeans(X);
# normalization
X = scale(X=X, center=TRUE, scale=TRUE);
# split training and testing
-[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$4, seed=7)
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$6, seed=7)
-# train regression model
+# train regression model
B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
# model evaluation on test split
@@ -54,11 +55,11 @@ ss_res = sum(y_residual^2);
ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
print("\nAccuracy:" +
- "\n--sum(ytest) = " + sum(ytest) +
+ "\n--sum(ytest) = " + sum(ytest) +
"\n--sum(yhat) = " + sum(yhat) +
- "\n--AVG_RES_Y: " + avg_res +
- "\n--SS_AVG_RES_Y: " + ss_avg_res +
- "\n--R2: " + R2 );
-
+ "\n--AVG_RES_Y: " + avg_res +
+ "\n--SS_AVG_RES_Y: " + ss_avg_res +
+ "\n--R2: " + R2 );
+
# write trained model and meta data
-write(B, $5)
+write(B, $7)
diff --git
a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
index 72ca292..1934aae 100644
--- a/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
+++ b/src/test/scripts/functions/federated/FederatedLmPipelineReference.dml
@@ -20,7 +20,8 @@
#-------------------------------------------------------------
Fin = rbind(read($1), read($2))
-y = read($3)
+
+y = read($5)
# one hot encoding categorical, other passthrough
Fall = as.frame(Fin)
@@ -34,15 +35,15 @@ colMean = (colMeans(X))
upperBound = colMean + 1.5 * colSD
lowerBound = colMean - 1.5 * colSD
outFilter = (X < lowerBound) | (X > upperBound)
-X = X - outFilter*X + outFilter*colMeans(X);
+X = X - outFilter*X + outFilter*colMeans(X);
# normalization
X = scale(X=X, center=TRUE, scale=TRUE);
# split training and testing
-[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$4, seed=7)
+[Xtrain , Xtest, ytrain, ytest] = split(X=X, Y=y, cont=$6, seed=7)
-# train regression model
+# train regression model
B = lm(X=Xtrain, y=ytrain, icpt=1, reg=1e-3, tol=1e-9, verbose=TRUE)
# model evaluation on test split
@@ -54,11 +55,11 @@ ss_res = sum(y_residual^2);
ss_avg_res = ss_res - nrow(ytest) * avg_res^2;
R2 = 1 - ss_res / (sum(y^2) - nrow(ytest) * (sum(y)/nrow(ytest))^2);
print("\nAccuracy:" +
- "\n--sum(ytest) = " + sum(ytest) +
+ "\n--sum(ytest) = " + sum(ytest) +
"\n--sum(yhat) = " + sum(yhat) +
- "\n--AVG_RES_Y: " + avg_res +
- "\n--SS_AVG_RES_Y: " + ss_avg_res +
- "\n--R2: " + R2 );
-
+ "\n--AVG_RES_Y: " + avg_res +
+ "\n--SS_AVG_RES_Y: " + ss_avg_res +
+ "\n--R2: " + R2 );
+
# write trained model and meta data
-write(B, $5)
+write(B, $7)