This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 3b5ea89 [SYSTEMDS-2731] Add read timeout federated
3b5ea89 is described below
commit 3b5ea894ecbba8c96361ea58e9f2ee9663c534c4
Author: baunsgaard <[email protected]>
AuthorDate: Sat Nov 14 22:14:36 2020 +0100
[SYSTEMDS-2731] Add read timeout federated
This commit adds a timeout on the federated initialization.
This is fair to do since initialization in all cases should not take
more than a couple of seconds, because it only transfers back and forth
initialization information, while not in principle reading the data.
The default timeout is set to 10 seconds, but can be customized
(in second granularity) inside the systemds-config.xml file.
---
src/main/java/org/apache/sysds/conf/DMLConfig.java | 4 +-
.../controlprogram/federated/FederatedData.java | 18 ++++-
.../instructions/fed/InitFEDInstruction.java | 13 +++-
src/test/config/SystemDS-config.xml | 7 +-
.../federated/FederatedTestObjectConstructor.java | 31 ++++----
.../federated/io/FederatedReaderTest.java | 10 +--
.../functions/federated/io/FederatedSSLTest.java | 10 +--
...edReaderTest.java => FederatedTimeoutTest.java} | 86 +++++++++++++---------
8 files changed, 110 insertions(+), 69 deletions(-)
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 49c83ee..4fb32e7 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -94,6 +94,7 @@ public class DMLConfig
public static final String EVICTION_SHADOW_BUFFERSIZE =
"sysds.gpu.eviction.shadow.bufferSize";
public static final String USE_SSL_FEDERATED_COMMUNICATION =
"sysds.federated.ssl"; // boolean
+ public static final String DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT =
"sysds.federated.initialization.timeout"; // int seconds
public static final int DEFAULT_FEDERATED_PORT = 4040; // borrowed
default Spark Port
public static final int DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS = 2;
@@ -140,6 +141,7 @@ public class DMLConfig
_defaultVals.put(EAGER_CUDA_FREE, "false" );
_defaultVals.put(FLOATING_POINT_PRECISION, "double" );
_defaultVals.put(USE_SSL_FEDERATED_COMMUNICATION, "false");
+ _defaultVals.put(DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT,
"10");
}
public DMLConfig() {
@@ -388,7 +390,7 @@ public class DMLConfig
STATS_MAX_WRAP_LEN, PRINT_GPU_MEMORY_INFO,
AVAILABLE_GPUS, SYNCHRONIZE_GPU, EAGER_CUDA_FREE,
FLOATING_POINT_PRECISION, GPU_EVICTION_POLICY,
LOCAL_SPARK_NUM_THREADS, EVICTION_SHADOW_BUFFERSIZE,
GPU_MEMORY_ALLOCATOR, GPU_MEMORY_UTILIZATION_FACTOR,
- USE_SSL_FEDERATED_COMMUNICATION
+ USE_SSL_FEDERATED_COMMUNICATION,
DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT
};
StringBuilder sb = new StringBuilder();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index d19d132..67f16e2 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -58,9 +58,21 @@ public class FederatedData {
private static final Log LOG =
LogFactory.getLog(FederatedData.class.getName());
private static final Set<InetSocketAddress> _allFedSites = new
HashSet<>();
+ private static SslContext sslCtx;
+
+ static {
+ try {
+ sslCtx =
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+ }
+ catch(SSLException e) {
+ LOG.error("Static SSL setup failed for client side");
+ }
+ }
+
private final Types.DataType _dataType;
private final InetSocketAddress _address;
private final String _filepath;
+
/**
* The ID of default matrix/tensor on which operations get executed if
no other ID is given.
*/
@@ -156,8 +168,7 @@ public class FederatedData {
// Careful with the number of threads. Each thread opens
connections to multiple files making resulting in
// java.io.IOException: Too many open files
EventLoopGroup workerGroup = new
NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS);
- final SslContext sslCtx =
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
+
try {
Bootstrap b = new Bootstrap();
final DataRequestHandler handler = new
DataRequestHandler(workerGroup);
@@ -179,9 +190,10 @@ public class FederatedData {
}
});
-
+
ChannelFuture f = b.connect(address).sync();
Promise<FederatedResponse> promise =
f.channel().eventLoop().newPromise();
+
handler.setPromise(promise);
f.channel().writeAndFlush(request);
return promise;
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index c3789d6..8821a71 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -30,9 +30,13 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
@@ -55,7 +59,7 @@ import org.apache.sysds.runtime.instructions.cp.StringObject;
public class InitFEDInstruction extends FEDInstruction {
- // private static final Log LOG =
LogFactory.getLog(InitFEDInstruction.class.getName());
+ private static final Log LOG =
LogFactory.getLog(InitFEDInstruction.class.getName());
public static final String FED_MATRIX_IDENTIFIER = "matrix";
public static final String FED_FRAME_IDENTIFIER = "frame";
@@ -229,8 +233,13 @@ public class InitFEDInstruction extends FEDInstruction {
colPartitioned &= (range.getSize(0) ==
output.getNumRows());
}
try {
+ int timeout =
ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.DEFAULT_FEDERATED_INITIALIZATION_TIMEOUT);
+ LOG.error("Federated Initialization with timeout: " +
timeout);
for (Pair<FederatedData, Future<FederatedResponse>>
idResponse : idResponses)
- idResponse.getRight().get(); //wait for
initialization
+
idResponse.getRight().get(timeout,TimeUnit.SECONDS); //wait for initialization
+ }
+ catch (TimeoutException e){
+ throw new DMLRuntimeException("Federated Initialization
timeout exceeded", e);
}
catch (Exception e) {
throw new DMLRuntimeException("Federation
initialization failed", e);
diff --git a/src/test/config/SystemDS-config.xml
b/src/test/config/SystemDS-config.xml
index 98ea336..2519a38 100644
--- a/src/test/config/SystemDS-config.xml
+++ b/src/test/config/SystemDS-config.xml
@@ -21,7 +21,7 @@
<!-- local fs tmp working directory-->
<sysds.localtmpdir>/tmp/systemds</sysds.localtmpdir>
- <!-- hdfs tmp working directory-->
+ <!-- hdfs tmp working directory-->
<sysds.scratch>scratch_space</sysds.scratch>
<!-- compiler optimization level, valid values: 0 | 1 | 2 | 3 | 4, default:
2 -->
@@ -32,7 +32,7 @@
<!-- enables multi-threaded matrix operations in singlenode control program
-->
<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
-
+
<!-- enables multi-threaded read/write in singlenode control program -->
<sysds.cp.parallel.io>true</sysds.cp.parallel.io>
@@ -44,4 +44,7 @@
<!-- The number of theads for the spark instance artificially selected-->
<sysds.local.spark.number.threads>16</sysds.local.spark.number.threads>
+
+ <!-- The timeout of the federated tests to initialize the federated
matrixes -->
+
<sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout>
</root>
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedTestObjectConstructor.java
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedTestObjectConstructor.java
index a970479..1edfb1f 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/FederatedTestObjectConstructor.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/FederatedTestObjectConstructor.java
@@ -21,6 +21,7 @@ package org.apache.sysds.test.functions.federated;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -34,28 +35,24 @@ import
org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaData;
-import org.junit.Assert;
+
public class FederatedTestObjectConstructor {
public static MatrixObject constructFederatedInput(int rows, int cols,
int blocksize, String host, long[][] begin,
- long[][] end, int[] ports, String[] inputs, String file) {
+ long[][] end, int[] ports, String[] inputs, String file) throws
UnknownHostException {
MatrixObject fed = new MatrixObject(ValueType.FP64, file);
- try {
- fed.setMetaData(new MetaData(new
MatrixCharacteristics(rows, cols, blocksize, rows * cols)));
- List<Pair<FederatedRange, FederatedData>> d = new
ArrayList<>();
- for(int i = 0; i < ports.length; i++) {
- FederatedRange X1r = new
FederatedRange(begin[i], end[i]);
- FederatedData X1d = new
FederatedData(Types.DataType.MATRIX,
- new
InetSocketAddress(InetAddress.getByName(host), ports[i]), inputs[i]);
- d.add(new ImmutablePair<>(X1r, X1d));
- }
-
- InitFEDInstruction.federateMatrix(fed, d);
- }
- catch(Exception e) {
- e.printStackTrace();
- Assert.assertTrue(false);
+
+ fed.setMetaData(new MetaData(new MatrixCharacteristics(rows,
cols, blocksize, rows * cols)));
+ List<Pair<FederatedRange, FederatedData>> d = new ArrayList<>();
+ for(int i = 0; i < ports.length; i++) {
+ FederatedRange X1r = new FederatedRange(begin[i],
end[i]);
+ FederatedData X1d = new
FederatedData(Types.DataType.MATRIX,
+ new
InetSocketAddress(InetAddress.getByName(host), ports[i]), inputs[i]);
+ d.add(new ImmutablePair<>(X1r, X1d));
}
+
+ InitFEDInstruction.federateMatrix(fed, d);
+
return fed;
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 2587fe9..a5630e0 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -91,12 +91,12 @@ public class FederatedReaderTest extends AutomatedTestBase {
Thread t2 = startLocalFedWorkerThread(port2);
String host = "localhost";
- MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(
- rows, cols, blocksize, host, begins, ends, new int[]
{port1, port2},
- new String[] {input("X1"), input("X2")},
input("X.json"));
- writeInputFederatedWithMTD("X.json", fed, null);
-
+
try {
+ MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(
+ rows, cols, blocksize, host, begins, ends, new
int[] {port1, port2},
+ new String[] {input("X1"), input("X2")},
input("X.json"));
+ writeInputFederatedWithMTD("X.json", fed, null);
// Run reference dml script with normal matrix
fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/" + TEST_NAME + (rowPartitioned ? "Row" : "Col")
+ "Reference.dml";
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
index d086174..261daf6 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedSSLTest.java
@@ -97,12 +97,12 @@ public class FederatedSSLTest extends AutomatedTestBase {
Thread t2 = startLocalFedWorkerThread(port2);
String host = "localhost";
- MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(
- rows, cols, blocksize, host, begins, ends, new int[]
{port1, port2},
- new String[] {input("X1"), input("X2")},
input("X.json"));
- writeInputFederatedWithMTD("X.json", fed, null);
-
+
try {
+ MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(
+ rows, cols, blocksize, host, begins, ends, new
int[] {port1, port2},
+ new String[] {input("X1"), input("X2")},
input("X.json"));
+ writeInputFederatedWithMTD("X.json", fed, null);
// Run reference dml script with normal matrix
fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/" + TEST_NAME + (rowPartitioned ? "Row" : "Col")
+ "Reference.dml";
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedTimeoutTest.java
similarity index 65%
copy from
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
copy to
src/test/java/org/apache/sysds/test/functions/federated/io/FederatedTimeoutTest.java
index 2587fe9..e8c1ed7 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedTimeoutTest.java
@@ -18,11 +18,18 @@
*/
package org.apache.sysds.test.functions.federated.io;
-
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Collection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.conf.DMLConfig;
+import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.test.AutomatedTestBase;
@@ -36,13 +43,16 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
@net.jcip.annotations.NotThreadSafe
-public class FederatedReaderTest extends AutomatedTestBase {
+public class FederatedTimeoutTest extends AutomatedTestBase {
+ private static final Log LOG =
LogFactory.getLog(FederatedTimeoutTest.class.getName());
- // private static final Log LOG =
LogFactory.getLog(FederatedReaderTest.class.getName());
- private final static String TEST_DIR = "functions/federated/ioR/";
+ // This test use the same scripts as the Federated Reader tests, just
with intended to fail connecting
+ private final static String TEST_DIR = "functions/federated/io/";
private final static String TEST_NAME = "FederatedReaderTest";
- private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedReaderTest.class.getSimpleName() + "/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
FederatedTimeoutTest.class.getSimpleName() + "/";
private final static int blocksize = 1024;
+ private final static File TEST_CONF_FILE = new
File("src/test/config/SystemDS-config.xml");
+
@Parameterized.Parameter()
public int rows;
@Parameterized.Parameter(1)
@@ -66,14 +76,9 @@ public class FederatedReaderTest extends AutomatedTestBase {
@Test
public void federatedSinglenodeRead() {
- federatedRead(Types.ExecMode.SINGLE_NODE);
- }
-
- public void federatedRead(Types.ExecMode execMode) {
+ Types.ExecMode execMode = Types.ExecMode.SINGLE_NODE;
Types.ExecMode oldPlatform = setExecMode(execMode);
- getAndLoadTestConfiguration(TEST_NAME);
- setOutputBuffering(true);
-
+
// write input matrices
int halfRows = rows / 2;
long[][] begins = new long[][] {new long[] {0, 0}, new long[]
{halfRows, 0}};
@@ -87,31 +92,30 @@ public class FederatedReaderTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Thread t1 = startLocalFedWorkerThread(port1, 10);
- Thread t2 = startLocalFedWorkerThread(port2);
String host = "localhost";
- MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(
- rows, cols, blocksize, host, begins, ends, new int[]
{port1, port2},
- new String[] {input("X1"), input("X2")},
input("X.json"));
- writeInputFederatedWithMTD("X.json", fed, null);
-
+ ServerSocket clientSocket = null;
+ ServerSocket clientSocket2 = null;
try {
- // Run reference dml script with normal matrix
- fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/" + TEST_NAME + (rowPartitioned ? "Row" : "Col")
- + "Reference.dml";
- programArgs = new String[] {"-stats", "-args",
input("X1"), input("X2")};
- String refOut = runTest(null).toString();
-
- // Run federated
- fullDMLScriptName = SCRIPT_DIR +
"functions/federated/io/" + TEST_NAME + ".dml";
- programArgs = new String[] {"-stats", "-args",
input("X.json")};
- String out = runTest(null).toString();
+ DMLConfig dmlconf =
DMLConfig.readConfigurationFile(TEST_CONF_FILE.getPath());
+ ConfigurationManager.setGlobalConfig(dmlconf);
+ clientSocket = new ServerSocket(port1);
+ clientSocket2 = new ServerSocket(port2);
+ MatrixObject fed =
FederatedTestObjectConstructor.constructFederatedInput(rows,
+ cols,
+ blocksize,
+ host,
+ begins,
+ ends,
+ new int[] {port1, port2},
+ new String[] {input("X1"), input("X2")},
+ input("X.json"));
+ writeInputFederatedWithMTD("X.json", fed, null);
-
Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
- // Verify output
-
Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]),
- Double.parseDouble(out.split("\n")[0]),
0.00001);
+ }
+ catch(DMLRuntimeException e) {
+ LOG.info("Correctly timeout");
+ // Great the test parsed.
}
catch(Exception e) {
e.printStackTrace();
@@ -119,8 +123,22 @@ public class FederatedReaderTest extends AutomatedTestBase
{
}
finally {
resetExecMode(oldPlatform);
+ if(clientSocket != null)
+ try {
+ clientSocket.close();
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ }
+ if(clientSocket2 != null)
+ try {
+ clientSocket2.close();
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ }
}
- TestUtils.shutdownThreads(t1, t2);
}
+
}