Repository: tez Updated Branches: refs/heads/master 77efbc313 -> 596c90a77
TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/596c90a7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/596c90a7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/596c90a7 Branch: refs/heads/master Commit: 596c90a777beed4cdf7fb2fcf08e92a003d03c18 Parents: 77efbc3 Author: Jonathan Eagles <[email protected]> Authored: Fri Aug 28 10:51:37 2015 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Aug 28 10:51:37 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 38 ++++++++++++++++++++ .../apache/tez/dag/api/TezConfiguration.java | 19 ++++++++++ .../org/apache/tez/client/TestTezClient.java | 38 ++++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 7 +--- 5 files changed, 97 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d1eb7f6..09e51d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -151,6 +151,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 312ddcd..e39cf4f 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -565,11 +566,43 @@ public class TezClient { ShutdownSessionRequestProto.newBuilder().build(); proxy.shutdownSession(null, request); sessionShutdownSuccessful = true; + boolean asynchronousStop = amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, + TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT); + if (!asynchronousStop) { + LOG.info("Waiting until application is in a final state"); + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + long killTimeOut = amConfig.getTezConfiguration().getLong( + TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, + TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT); + ApplicationReport appReport = frameworkClient + .getApplicationReport(sessionAppId); + while ((currentTimeMillis < timeKillIssued + killTimeOut) + && !isJobInTerminalState(appReport.getYarnApplicationState())) { + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + /** interrupted, just break */ + break; + } + currentTimeMillis = System.currentTimeMillis(); + appReport = frameworkClient.getApplicationReport(sessionAppId); + } + + if (!isJobInTerminalState(appReport.getYarnApplicationState())) { + frameworkClient.killApplication(sessionAppId); + } + } } } catch (TezException e) { LOG.info("Failed to shutdown Tez Session via proxy", e); } catch (ServiceException e) { LOG.info("Failed to shutdown Tez Session via proxy", e); + } catch (ApplicationNotFoundException e) { + LOG.info("Failed to kill nonexistent application " + sessionAppId, e); + } catch (YarnException e) { + throw new TezException(e); } if (!sessionShutdownSuccessful) { LOG.info("Could not connect to AM, killing session via YARN" @@ -590,6 +623,11 @@ public class TezClient { } } } + private boolean isJobInTerminalState(YarnApplicationState yarnApplicationState) { + return (yarnApplicationState == YarnApplicationState.FINISHED + || yarnApplicationState == YarnApplicationState.FAILED + || yarnApplicationState == YarnApplicationState.KILLED); + } /** * Get the name of the client http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index bb404ee..54d490b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1286,4 +1286,23 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "java.opts.checker.enabled"; public static final boolean TEZ_CLIENT_JAVA_OPTS_CHECKER_ENABLED_DEFAULT = true; + /** + * Long value. Time interval, in milliseconds, for client to wait during client-requested + * AM shutdown before issuing a hard kill to the RM for this application. + * Expert level setting. + */ + @ConfigurationScope(Scope.CLIENT) + public static final String TEZ_CLIENT_HARD_KILL_TIMEOUT_MS = TEZ_PREFIX + "client.timeout-ms"; + + public static final long TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT = 30 * 1000L; + + /** + * Boolean value. Backwards compatibility setting. Changes TezClient stop to be a + * synchronous call waiting until AM is in a final state before returning to the user. + * Expert level setting. + */ + @ConfigurationScope(Scope.CLIENT) + public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + "client.asynchronous-stop"; + + public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true; } http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 2c3cb36..55c4c0f 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -36,6 +36,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -80,6 +81,7 @@ import com.google.common.collect.Maps; import com.google.protobuf.RpcController; public class TestTezClient { + static final long HARD_KILL_TIMEOUT = 1500L; class TezClientForTest extends TezClient { TezYarnClient mockTezYarnClient; @@ -125,6 +127,8 @@ public class TestTezClient { } conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); + conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false); + conf.setLong(TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, HARD_KILL_TIMEOUT); TezClientForTest client = new TezClientForTest("test", conf, lrs, null); ApplicationId appId1 = ApplicationId.newInstance(0, 1); @@ -524,4 +528,38 @@ public class TestTezClient { client.start(); } + @Test(timeout = 5000) + public void testStopRetriesUntilTerminalState() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false); + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.KILLED); + try { + client.stop(); + } catch (Exception e) { + Assert.fail("Expected ApplicationNotFoundException"); + } + verify(client.mockYarnClient, atLeast(2)).getApplicationReport(client.mockAppId); + } + + @Test(timeout = 5000) + public void testStopRetriesUntilTimeout() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false); + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.NEW); + long start = System.currentTimeMillis(); + try { + client.stop(); + } catch (Exception e) { + Assert.fail("Stop should complete without exception: " + e); + } + long end = System.currentTimeMillis(); + verify(client.mockYarnClient, atLeast(2)).getApplicationReport(client.mockAppId); + Assert.assertTrue("Stop ended before timeout", end - start > HARD_KILL_TIMEOUT); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/596c90a7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 34e7c2a..613046b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1220,12 +1220,7 @@ public class DAGAppMaster extends AbstractService { //send a DAG_KILL message LOG.info("Sending a kill event to the current DAG" + ", dagId=" + currentDAG.getID()); - try { - logDAGKillRequestEvent(currentDAG.getID(), true); - } catch (IOException e) { - throw new TezException(e); - } - sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL)); + tryKillDAG(currentDAG); } else { LOG.info("No current running DAG, shutting down the AM"); if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
