Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 d272bfaa9 -> 21ad4cc9b
APEXCORE-358 - Make RPC timeout configurable. Introduced "com.datatorrent.stram.rpc.*" system properties that may be used to set RPC timeouts. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/21ad4cc9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/21ad4cc9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/21ad4cc9 Branch: refs/heads/release-3.3 Commit: 21ad4cc9b8e71789fc779d86f90cb0513fdf07bb Parents: d272bfa Author: Vlad Rozov <[email protected]> Authored: Sun Feb 28 13:43:00 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Sun Feb 28 15:48:15 2016 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/RecoverableRpcProxy.java | 44 ++++++++++++++------ .../stram/StreamingAppMasterService.java | 2 +- .../datatorrent/stram/StramRecoveryTest.java | 44 ++++++++++++++++++++ 3 files changed, 77 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java index fe4a86d..97fce63 100644 --- a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java +++ b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java @@ -29,6 +29,7 @@ import java.nio.charset.Charset; import static java.lang.Thread.sleep; import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URLEncodedUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,18 +52,28 @@ import java.util.List; */ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, Closeable { - private final static Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class); + private static final Logger LOG = LoggerFactory.getLogger(RecoverableRpcProxy.class); + + public static final String RPC_TIMEOUT = "com.datatorrent.stram.rpc.timeout"; + public static final String RETRY_TIMEOUT = "com.datatorrent.stram.rpc.retry.timeout"; + public static final String RETRY_DELAY = "com.datatorrent.stram.rpc.delay.timeout"; + public static final String QP_retryTimeoutMillis = "retryTimeoutMillis"; public static final String QP_retryDelayMillis = "retryDelayMillis"; public static final String QP_rpcTimeout = "rpcTimeout"; + + private static final int RETRY_TIMEOUT_DEFAULT = 30000; + private static final int RETRY_DELAY_DEFAULT = 10000; + private static final int RPC_TIMEOUT_DEFAULT = 5000; + private final Configuration conf; private final String appPath; private StreamingContainerUmbilicalProtocol umbilical; private String lastConnectURI; private long lastCompletedCallTms; - private long retryTimeoutMillis = 30000; - private long retryDelayMillis = 10000; - private int rpcTimeout = 5000; + private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT); + private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT); + private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT); public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException { @@ -169,15 +180,24 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, } } - public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, int retryDelayMillis, int retryTimeoutMillis) throws Exception + public static URI toConnectURI(final InetSocketAddress address) throws Exception + { + int rpcTimeoutMillis = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT); + long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT); + long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT); + return toConnectURI(address, rpcTimeoutMillis, retryDelayMillis, retryTimeoutMillis); + } + + public static URI toConnectURI(InetSocketAddress address, int rpcTimeoutMillis, long retryDelayMillis, long retryTimeoutMillis) throws Exception { - StringBuilder query = new StringBuilder(256); - query.append(RecoverableRpcProxy.QP_rpcTimeout + '=').append(rpcTimeoutMillis); - query.append('&'); - query.append(RecoverableRpcProxy.QP_retryDelayMillis + '=').append(retryDelayMillis); - query.append('&'); - query.append(RecoverableRpcProxy.QP_retryTimeoutMillis + '=').append(retryTimeoutMillis); - return new URI("stram", null, address.getHostName(), address.getPort(), null, query.toString(), null); + return new URIBuilder() + .setScheme("stram") + .setHost(address.getHostName()) + .setPort(address.getPort()) + .setParameter(RecoverableRpcProxy.QP_rpcTimeout, Integer.toString(rpcTimeoutMillis)) + .setParameter(RecoverableRpcProxy.QP_retryDelayMillis, Long.toString(retryDelayMillis)) + .setParameter(RecoverableRpcProxy.QP_retryTimeoutMillis, Long.toString(retryTimeoutMillis)) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index db8c255..8565275 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -555,7 +555,7 @@ public class StreamingAppMasterService extends CompositeService // write the connect address for containers to DFS InetSocketAddress connectAddress = NetUtils.getConnectAddress(this.heartbeatListener.getAddress()); - URI connectUri = new URI("stram", null, connectAddress.getHostName(), connectAddress.getPort(), null, null, null); + URI connectUri = RecoverableRpcProxy.toConnectURI(connectAddress); FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), getConfig()); recoveryHandler.writeConnectUri(connectUri.toString()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/21ad4cc9/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 6dbdcf0..75b4684 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -518,7 +518,51 @@ public class StramRecoveryTest Assert.assertTrue("timedout", timedout.get()); rp.close(); + + String rpcTimeout = System.getProperty(RecoverableRpcProxy.RPC_TIMEOUT); + String rpcRetryDelay = System.getProperty(RecoverableRpcProxy.RETRY_DELAY); + String rpcRetryTimeout = System.getProperty(RecoverableRpcProxy.RETRY_TIMEOUT); + + System.setProperty(RecoverableRpcProxy.RPC_TIMEOUT, Integer.toString(500)); + System.setProperty(RecoverableRpcProxy.RETRY_DELAY, Long.toString(100)); + System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(500)); + + timedout.set(false); + uri = RecoverableRpcProxy.toConnectURI(address); + recoveryHandler.writeConnectUri(uri.toString()); + + rp = new RecoverableRpcProxy(appPath, conf); + protocolProxy = rp.getProxy(); + protocolProxy.log("containerId", "msg"); + try { + protocolProxy.log("containerId", "timeout"); + Assert.fail("expected socket timeout"); + } catch (java.net.SocketTimeoutException e) { + // expected + } + Assert.assertTrue("timedout", timedout.get()); + rp.close(); + + timedout.set(false); + System.setProperty(RecoverableRpcProxy.RETRY_TIMEOUT, Long.toString(1500)); + + uri = RecoverableRpcProxy.toConnectURI(address); + recoveryHandler.writeConnectUri(uri.toString()); + + protocolProxy.log("containerId", "timeout"); + Assert.assertTrue("timedout", timedout.get()); + + + restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout); + restoreSystemProperty(RecoverableRpcProxy.RETRY_DELAY, rpcRetryDelay); + restoreSystemProperty(RecoverableRpcProxy.RETRY_TIMEOUT, rpcRetryTimeout); + server.stop(); } + private static String restoreSystemProperty(final String key, final String value) + { + return (value == null)? System.clearProperty(key) : System.setProperty(key, value); + } + }
