Repository: apex-core Updated Branches: refs/heads/master d80501bdc -> 32f229f21
APEXCORE-608 Streaming Containers use stale RPC proxy after connection is closed Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/c687bb5e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/c687bb5e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/c687bb5e Branch: refs/heads/master Commit: c687bb5e69e1ab9f3510d68dd726a1de20430908 Parents: 4302929 Author: Vlad Rozov <[email protected]> Authored: Thu Jan 19 17:55:28 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sat Feb 11 22:26:11 2017 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/RecoverableRpcProxy.java | 147 ++++++++++--------- .../datatorrent/stram/StramLocalCluster.java | 10 +- .../stram/StreamingContainerParent.java | 44 +++--- .../StreamingContainerUmbilicalProtocol.java | 4 +- .../stram/engine/StreamingContainer.java | 60 +++++--- .../datatorrent/stram/StramRecoveryTest.java | 12 +- 6 files changed, 150 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java index 1de581e..e454d49 100644 --- a/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java +++ b/engine/src/main/java/com/datatorrent/stram/RecoverableRpcProxy.java @@ -22,13 +22,15 @@ import java.io.Closeable; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.ConnectException; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; import java.net.URI; import java.nio.charset.Charset; import java.util.List; +import javax.net.SocketFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +42,6 @@ import org.apache.http.NameValuePair; import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URLEncodedUtils; -import com.google.common.base.Throwables; - import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; import static java.lang.Thread.sleep; @@ -68,98 +68,111 @@ public class RecoverableRpcProxy implements java.lang.reflect.InvocationHandler, private static final int RPC_TIMEOUT_DEFAULT = 5000; private final Configuration conf; - private final String appPath; private StreamingContainerUmbilicalProtocol umbilical; private String lastConnectURI; - private long lastCompletedCallTms; - private long retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT); - private long retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT); - private int rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT); - - public RecoverableRpcProxy(String appPath, Configuration conf) throws IOException + private long retryTimeoutMillis; + private long retryDelayMillis; + private int rpcTimeout; + private final UserGroupInformation currentUser; + private final SocketFactory defaultSocketFactory; + private final FSRecoveryHandler fsRecoveryHandler; + + public RecoverableRpcProxy(String appPath, Configuration conf) { this.conf = conf; - this.appPath = appPath; - connect(); + try { + currentUser = UserGroupInformation.getCurrentUser(); + defaultSocketFactory = NetUtils.getDefaultSocketFactory(conf); + fsRecoveryHandler = new FSRecoveryHandler(appPath, conf); + connect(0); + } catch (IOException e) { + LOG.error("Fail to create RecoverableRpcProxy", e); + throw new RuntimeException(e); + } } - private void connect() throws IOException + private long connect(long timeMillis) throws IOException { - FSRecoveryHandler fsrh = new FSRecoveryHandler(appPath, conf); - String uriStr = fsrh.readConnectUri(); + String uriStr = fsRecoveryHandler.readConnectUri(); if (!uriStr.equals(lastConnectURI)) { - // reset timeout LOG.debug("Got new RPC connect address {}", uriStr); - lastCompletedCallTms = System.currentTimeMillis(); lastConnectURI = uriStr; - } - URI heartbeatUri = URI.create(uriStr); + if (umbilical != null) { + RPC.stopProxy(umbilical); + } - String queryStr = heartbeatUri.getQuery(); - List<NameValuePair> queryList = null; - if (queryStr != null) { - queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset()); - } - if (queryList != null) { - for (NameValuePair pair : queryList) { - String value = pair.getValue(); - String key = pair.getName(); - if (QP_rpcTimeout.equals(key)) { - this.rpcTimeout = Integer.parseInt(value); - } else if (QP_retryTimeoutMillis.equals(key)) { - this.retryTimeoutMillis = Long.parseLong(value); - } else if (QP_retryDelayMillis.equals(key)) { - this.retryDelayMillis = Long.parseLong(value); + retryTimeoutMillis = Long.getLong(RETRY_TIMEOUT, RETRY_TIMEOUT_DEFAULT); + retryDelayMillis = Long.getLong(RETRY_DELAY, RETRY_DELAY_DEFAULT); + rpcTimeout = Integer.getInteger(RPC_TIMEOUT, RPC_TIMEOUT_DEFAULT); + + URI heartbeatUri = URI.create(uriStr); + + String queryStr = heartbeatUri.getQuery(); + if (queryStr != null) { + List<NameValuePair> queryList = URLEncodedUtils.parse(queryStr, Charset.defaultCharset()); + if (queryList != null) { + for (NameValuePair pair : queryList) { + String value = pair.getValue(); + String key = pair.getName(); + if (QP_rpcTimeout.equals(key)) { + this.rpcTimeout = Integer.parseInt(value); + } else if (QP_retryTimeoutMillis.equals(key)) { + this.retryTimeoutMillis = Long.parseLong(value); + } else if (QP_retryDelayMillis.equals(key)) { + this.retryDelayMillis = Long.parseLong(value); + } + } } } + InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(), heartbeatUri.getPort()); + umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, address, currentUser, conf, defaultSocketFactory, rpcTimeout); + // reset timeout + return System.currentTimeMillis() + retryTimeoutMillis; } - InetSocketAddress address = NetUtils.createSocketAddrForHost(heartbeatUri.getHost(), heartbeatUri.getPort()); - umbilical = RPC.getProxy(StreamingContainerUmbilicalProtocol.class, StreamingContainerUmbilicalProtocol.versionID, address, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout); + return timeMillis; } - public StreamingContainerUmbilicalProtocol getProxy() + public StreamingContainerUmbilicalProtocol getProxy() throws IOException { - StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)java.lang.reflect.Proxy.newProxyInstance(umbilical.getClass().getClassLoader(), umbilical.getClass().getInterfaces(), this); + if (umbilical == null) { + throw new IOException("RecoverableRpcProxy is closed."); + } + StreamingContainerUmbilicalProtocol recoverableProxy = (StreamingContainerUmbilicalProtocol)Proxy.newProxyInstance(umbilical.getClass().getClassLoader(), umbilical.getClass().getInterfaces(), this); return recoverableProxy; } @Override @SuppressWarnings("SleepWhileInLoop") - public Object invoke(Object proxy, Method method, Object[] args) throws ConnectException, SocketTimeoutException, InterruptedException, IllegalAccessException + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - Object result; + long endTimeMillis = System.currentTimeMillis() + retryTimeoutMillis; + if (umbilical == null) { + endTimeMillis = connect(endTimeMillis); + } while (true) { + if (umbilical == null) { + throw new IOException("RecoverableRpcProxy is closed."); + } try { - if (umbilical == null) { - connect(); - } - //long start = System.nanoTime(); - result = method.invoke(umbilical, args); - lastCompletedCallTms = System.currentTimeMillis(); - //long end = System.nanoTime(); - //LOG.info(String.format("%s took %d ns", method.getName(), (end - start))); - return result; - } catch (InvocationTargetException e) { + return method.invoke(umbilical, args); + } catch (Throwable t) { // handle RPC failure - Throwable targetException = e.getTargetException(); - long connectMillis = System.currentTimeMillis() - lastCompletedCallTms; - if (connectMillis < retryTimeoutMillis) { - LOG.warn("RPC failure, attempting reconnect after {} ms (remaining {} ms)", retryDelayMillis, retryTimeoutMillis - connectMillis, targetException); - close(); + while (t instanceof InvocationTargetException || t instanceof UndeclaredThrowableException) { + Throwable cause = t.getCause(); + if (cause != null) { + t = cause; + } + } + final long currentTimeMillis = System.currentTimeMillis(); + if (currentTimeMillis < endTimeMillis) { + LOG.warn("RPC failure, will retry after {} ms (remaining {} ms)", retryDelayMillis, endTimeMillis - currentTimeMillis, t); sleep(retryDelayMillis); + endTimeMillis = connect(endTimeMillis); } else { - LOG.error("Giving up RPC connection recovery after {} ms", connectMillis, targetException); - if (targetException instanceof java.net.ConnectException) { - throw (java.net.ConnectException)targetException; - } else if (targetException instanceof java.net.SocketTimeoutException) { - throw (java.net.SocketTimeoutException)targetException; - } else { - throw Throwables.propagate(targetException); - } + LOG.error("Giving up RPC connection recovery after {} ms", currentTimeMillis - endTimeMillis + retryTimeoutMillis, t); + close(); + throw t; } - } catch (IOException ex) { - close(); - throw new RuntimeException(ex); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 2ffbabd..7e4b78a 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -105,13 +105,9 @@ public class StramLocalCluster implements Runnable, Controller } @Override - public void reportError(String containerId, int[] operators, String msg) + public void reportError(String containerId, int[] operators, String msg) throws IOException { - try { - log(containerId, msg); - } catch (IOException ex) { - // ignore - } + log(containerId, msg); } @Override @@ -131,7 +127,7 @@ public class StramLocalCluster implements Runnable, Controller } @Override - public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) + public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException { if (injectShutdown.containsKey(msg.getContainerId())) { ContainerHeartbeatResponse r = new ContainerHeartbeatResponse(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java index b36529a..e5d8a97 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java @@ -163,11 +163,16 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit @Override public void log(String containerId, String msg) throws IOException { - LOG.info("child msg: {} context: {}", msg, dagManager.getContainerAgent(containerId).container); + final StreamingContainerAgent sca = dagManager.getContainerAgent(containerId); + if (sca != null) { + LOG.info("child msg: {} context: {}", msg, sca.container); + } else { + LOG.info("unknown container {} msg: {}", containerId, msg); + } } @Override - public void reportError(String containerId, int[] operators, String msg) + public void reportError(String containerId, int[] operators, String msg) throws IOException { if (operators == null || operators.length == 0) { dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg)); @@ -179,24 +184,23 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit } } } - try { - log(containerId, msg); - } catch (IOException ex) { - // ignore - } + log(containerId, msg); } @Override public StreamingContainerContext getInitContext(String containerId) throws IOException { + StreamingContainerContext scc = null; StreamingContainerAgent sca = dagManager.getContainerAgent(containerId); - - return sca.getInitContext(); + if (sca != null) { + scc = sca.getInitContext(); + } + return scc; } @Override - public ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) + public ContainerHeartbeatResponse processHeartbeat(final ContainerHeartbeat msg) throws IOException { // -- TODO // Change to use some sort of a annotation that developers can use to specify secure code @@ -208,20 +212,14 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit //LOG.debug("RPC latency from child container {} is {} ms (according to system clocks)", msg.getContainerId(), // now - msg.sentTms); dagManager.updateRPCLatency(msg.getContainerId(), now - msg.sentTms); - try { - final ContainerHeartbeat fmsg = msg; - return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>() + return SecureExecutor.execute(new SecureExecutor.WorkLoad<ContainerHeartbeatResponse>() + { + @Override + public ContainerHeartbeatResponse run() { - @Override - public ContainerHeartbeatResponse run() - { - return dagManager.processHeartbeat(fmsg); - } - }); - } catch (IOException ex) { - LOG.error("Error processing heartbeat", ex); - return null; - } + return dagManager.processHeartbeat(msg); + } + }); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java index 77a33e6..13832ba 100644 --- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java +++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java @@ -431,7 +431,7 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol * @param operators * @param msg */ - void reportError(String containerId, int[] operators, String msg); + void reportError(String containerId, int[] operators, String msg) throws IOException; /** * To be called periodically by child for heartbeat protocol. @@ -439,6 +439,6 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol * @param msg * @return */ - ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg); + ContainerHeartbeatResponse processHeartbeat(ContainerHeartbeat msg) throws IOException; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index c3886b4..38963c4 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -297,10 +297,12 @@ public class StreamingContainer extends YarnContainerMain int exitStatus = 1; // interpreted as unrecoverable container failure - RecoverableRpcProxy rpcProxy = new RecoverableRpcProxy(appPath, new Configuration()); - final StreamingContainerUmbilicalProtocol umbilical = rpcProxy.getProxy(); + RecoverableRpcProxy rpcProxy = null; + StreamingContainerUmbilicalProtocol umbilical = null; final String childId = System.getProperty(StreamingApplication.DT_PREFIX + "cid"); try { + rpcProxy = new RecoverableRpcProxy(appPath, new Configuration()); + umbilical = rpcProxy.getProxy(); StreamingContainerContext ctx = umbilical.getInitContext(childId); StreamingContainer stramChild = new StreamingContainer(childId, umbilical); logger.debug("Container Context = {}", ctx); @@ -312,25 +314,24 @@ public class StreamingContainer extends YarnContainerMain } finally { stramChild.teardown(); } - } catch (Error error) { - logger.error("Fatal error in container!", error); + } catch (Error | Exception e) { + logger.error("Fatal {} in container!", (e instanceof Error) ? "Error" : "Exception", e); /* Report back any failures, for diagnostic purposes */ - String msg = ExceptionUtils.getStackTrace(error); - umbilical.reportError(childId, null, "FATAL: " + msg); - } catch (Exception exception) { - logger.error("Fatal exception in container!", exception); - /* Report back any failures, for diagnostic purposes */ - String msg = ExceptionUtils.getStackTrace(exception); - umbilical.reportError(childId, null, msg); + try { + umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e)); + } catch (Exception ex) { + logger.debug("Fail to log", ex); + } } finally { - rpcProxy.close(); + if (rpcProxy != null) { + rpcProxy.close(); + } DefaultMetricsSystem.shutdown(); logger.info("Exit status for container: {}", exitStatus); LogManager.shutdown(); - } - - if (exitStatus != 0) { - System.exit(exitStatus); + if (exitStatus != 0) { + System.exit(exitStatus); + } } } @@ -601,8 +602,8 @@ public class StreamingContainer extends YarnContainerMain public void heartbeatLoop() throws Exception { - umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop.."); logger.debug("Entering heartbeat loop (interval is {} ms)", this.heartbeatIntervalMillis); + umbilical.log(containerId, "[" + containerId + "] Entering heartbeat loop.."); final YarnConfiguration conf = new YarnConfiguration(); long tokenLifeTime = (long)(containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)); long expiryTime = System.currentTimeMillis(); @@ -718,7 +719,7 @@ public class StreamingContainer extends YarnContainerMain } while (rsp.hasPendingRequests); } - logger.debug("Exiting hearbeat loop"); + logger.debug("[{}] Exiting heartbeat loop", containerId); umbilical.log(containerId, "[" + containerId + "] Exiting heartbeat loop.."); } @@ -827,7 +828,7 @@ public class StreamingContainer extends YarnContainerMain try { umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest + " " + ExceptionUtils.getStackTrace(e)); } catch (IOException ioe) { - // ignore + logger.debug("Fail to log", ioe); } this.exitHeartbeatLoop = true; throw new IllegalStateException("Deploy request failed: " + rsp.deployRequest, e); @@ -1436,19 +1437,32 @@ public class StreamingContainer extends YarnContainerMain logger.error("Voluntary container termination due to an error in operator {}.", currentdi, error); operators = new int[]{currentdi.id}; } - umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error)); - System.exit(1); + try { + umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error)); + } catch (Exception e) { + logger.debug("Fail to log", e); + } finally { + System.exit(1); + } } catch (Exception ex) { if (currentdi == null) { failedNodes.add(ndi.id); logger.error("Operator set {} stopped running due to an exception.", setOperators, ex); int[] operators = new int[]{ndi.id}; - umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex)); + try { + umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex)); + } catch (Exception e) { + logger.debug("Fail to log", e); + } } else { failedNodes.add(currentdi.id); logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex); int[] operators = new int[]{currentdi.id}; - umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex)); + try { + umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex)); + } catch (Exception e) { + logger.debug("Fail to log", e); + } } } finally { if (setOperators.contains(ndi)) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/c687bb5e/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 74e18ee..e8ec26c 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -40,6 +40,7 @@ import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -474,7 +475,7 @@ public class StramRecoveryTest StreamingContainerUmbilicalProtocol impl = Mockito.mock(StreamingContainerUmbilicalProtocol.class, Mockito.withSettings().extraInterfaces(Closeable.class)); - Mockito.doAnswer(new org.mockito.stubbing.Answer<Void>() + final Answer<Void> answer = new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) @@ -491,8 +492,9 @@ public class StramRecoveryTest } return null; } - }) - .when(impl).log("containerId", "timeout"); + }; + Mockito.doAnswer(answer).when(impl).log("containerId", "timeout"); + Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout"); Server server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(impl) .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(1).setVerbose(false).build(); @@ -546,7 +548,7 @@ public class StramRecoveryTest rp = new RecoverableRpcProxy(appPath, conf); protocolProxy = rp.getProxy(); - protocolProxy.log("containerId", "msg"); + protocolProxy.reportError("containerId", null, "msg"); try { protocolProxy.log("containerId", "timeout"); Assert.fail("expected socket timeout"); @@ -562,7 +564,7 @@ public class StramRecoveryTest uri = RecoverableRpcProxy.toConnectURI(address); recoveryHandler.writeConnectUri(uri.toString()); - protocolProxy.log("containerId", "timeout"); + protocolProxy.reportError("containerId", null, "timeout"); Assert.assertTrue("timedout", timedout.get()); restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);
