Repository: tez Updated Branches: refs/heads/master 6d431469b -> b31cf3351
TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM failure. Contributed by Zhiyuan Yang. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b31cf335 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b31cf335 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b31cf335 Branch: refs/heads/master Commit: b31cf33518ca02a3e49743a0d349db293a1144e3 Parents: 6d43146 Author: Siddharth Seth <[email protected]> Authored: Mon Jan 23 14:48:26 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Jan 23 14:48:26 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 16 ++++++++-- .../org/apache/tez/client/TestTezClient.java | 33 ++++++++++++++++++++ .../java/org/apache/tez/test/TestTezJobs.java | 2 +- 4 files changed, 49 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 303f7f3..d6c4d49 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3584. amKeepAliveService in TezClient should shutdown in case of AM failure. TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics TEZ-3579. Wrong configuration key for max slow start fraction in CartesianProductVertexManager. TEZ-3458. Auto grouping for cartesian product edge(unpartitioned case). http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index f4e9f10..65ce0fb 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -484,6 +484,9 @@ public class TezClient { proxy = waitForProxy(); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to create a connection to the AM", e); + } catch (SessionNotRunning e) { + LOG.error("Cannot create a connection to the AM, stopping heartbeat to AM", e); + cancelAMKeepAlive(false); } } if (proxy != null) { @@ -1104,12 +1107,21 @@ public class TezClient { @VisibleForTesting @Private - public synchronized void cancelAMKeepAlive() { + public synchronized void cancelAMKeepAlive(boolean shutdownNow) { if (amKeepAliveService != null) { - amKeepAliveService.shutdownNow(); + if (shutdownNow) { + amKeepAliveService.shutdownNow(); + } else { + amKeepAliveService.shutdown(); + } } } + @VisibleForTesting + protected synchronized ScheduledExecutorService getAMKeepAliveService() { + return amKeepAliveService; + } + /** * A builder for setting up an instance of {@link org.apache.tez.client.TezClient} */ http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index dbbd619..c1f7fd1 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -830,4 +830,37 @@ public class TestTezClient { } + @Test(timeout = 10000) + public void testAMHeartbeatFailOnGetAMProxy() throws Exception { + int amHeartBeatTimeoutSecs = 3; + TezConfiguration conf = new TezConfiguration(); + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, amHeartBeatTimeoutSecs); + + final TezClientForTest client = configureAndCreateTezClient(conf); + client.callRealGetSessionAMProxy = true; + client.start(); + + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.FAILED); + Thread.sleep(2 * amHeartBeatTimeoutSecs * 1000); + assertTrue(client.getAMKeepAliveService().isTerminated()); + } + + @Test(timeout = 12000) + public void testAMHeartbeatFailOnGetAMStatus() throws Exception { + int amHeartBeatTimeoutSecs = 3; + TezConfiguration conf = new TezConfiguration(); + conf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, amHeartBeatTimeoutSecs); + + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + + when(client.sessionAmProxy.getAMStatus(any(RpcController.class), + any(GetAMStatusRequestProto.class))).thenThrow(new ServiceException("error")); + client.callRealGetSessionAMProxy = true; + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.FAILED); + Thread.sleep(3 * amHeartBeatTimeoutSecs * 1000); + assertTrue(client.getAMKeepAliveService().isTerminated()); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/b31cf335/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 479509d..5c50a34 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -1210,7 +1210,7 @@ public class TestTezJobs { tezConf.setInt(TezConfiguration.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS, 5); TezClient tezClient = TezClient.create("testAMClientHeartbeatTimeout", tezConf, true); tezClient.start(); - tezClient.cancelAMKeepAlive(); + tezClient.cancelAMKeepAlive(true); ApplicationId appId = tezClient.getAppMasterApplicationId();
