Repository: tez Updated Branches: refs/heads/branch-0.7 32de2b079 -> fb8fbd594
TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes (jeagles) (cherry picked from commit 596c90a777beed4cdf7fb2fcf08e92a003d03c18) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fb8fbd59 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fb8fbd59 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fb8fbd59 Branch: refs/heads/branch-0.7 Commit: fb8fbd594975dee39cb375a36058907d4fa54eb5 Parents: 32de2b0 Author: Jonathan Eagles <[email protected]> Authored: Fri Aug 28 10:51:37 2015 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Aug 28 11:06:51 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 38 ++++++++++++++++++++ .../apache/tez/dag/api/TezConfiguration.java | 20 +++++++++++ .../org/apache/tez/client/TestTezClient.java | 37 +++++++++++++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 7 +--- 5 files changed, 97 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index de31984..4bc5e7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2300. TezClient.stop() takes a lot of time or does not work sometimes TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers TEZ-2687. ATS History shutdown happens before the min-held containers are released http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 45837fe..3a5302c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -506,11 +507,43 @@ public class TezClient { ShutdownSessionRequestProto.newBuilder().build(); proxy.shutdownSession(null, request); sessionShutdownSuccessful = true; + boolean asynchronousStop = amConfig.getTezConfiguration().getBoolean( + TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, + TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT); + if (!asynchronousStop) { + LOG.info("Waiting until application is in a final state"); + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + long killTimeOut = amConfig.getTezConfiguration().getLong( + TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, + TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT); + ApplicationReport appReport = frameworkClient + .getApplicationReport(sessionAppId); + while ((currentTimeMillis < timeKillIssued + killTimeOut) + && !isJobInTerminalState(appReport.getYarnApplicationState())) { + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + /** interrupted, just break */ + break; + } + currentTimeMillis = System.currentTimeMillis(); + appReport = frameworkClient.getApplicationReport(sessionAppId); + } + + if (!isJobInTerminalState(appReport.getYarnApplicationState())) { + frameworkClient.killApplication(sessionAppId); + } + } } } catch (TezException e) { LOG.info("Failed to shutdown Tez Session via proxy", e); } catch (ServiceException e) { LOG.info("Failed to shutdown Tez Session via proxy", e); + } catch (ApplicationNotFoundException e) { + LOG.info("Failed to kill nonexistent application " + sessionAppId, e); + } catch (YarnException e) { + throw new TezException(e); } if (!sessionShutdownSuccessful) { LOG.info("Could not connect to AM, killing session via YARN" @@ -531,6 +564,11 @@ public class TezClient { } } } + private boolean isJobInTerminalState(YarnApplicationState yarnApplicationState) { + return (yarnApplicationState == YarnApplicationState.FINISHED + || yarnApplicationState == YarnApplicationState.FAILED + || yarnApplicationState == YarnApplicationState.KILLED); + } /** * Get the name of the client http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 2114793..aa781bc 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1258,4 +1258,24 @@ public class TezConfiguration extends Configuration { TEZ_PREFIX + "client.diagnostics.wait.timeout-ms"; @Private public static final long TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS_DEFAULT = 3*1000; + + /** + * Long value. Time interval, in milliseconds, for client to wait during client-requested + * AM shutdown before issuing a hard kill to the RM for this application. + * Expert level setting. + */ + @ConfigurationScope(Scope.CLIENT) + public static final String TEZ_CLIENT_HARD_KILL_TIMEOUT_MS = TEZ_PREFIX + "client.timeout-ms"; + + public static final long TEZ_CLIENT_HARD_KILL_TIMEOUT_MS_DEFAULT = 30 * 1000L; + + /** + * Boolean value. Backwards compatibility setting. Changes TezClient stop to be a + * synchronous call waiting until AM is in a final state before returning to the user. + * Expert level setting. + */ + @ConfigurationScope(Scope.CLIENT) + public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + "client.asynchronous-stop"; + + public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true; } http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 92d3cd2..f1f23e1 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -31,6 +31,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -74,6 +75,7 @@ import com.google.common.collect.Maps; import com.google.protobuf.RpcController; public class TestTezClient { + static final long HARD_KILL_TIMEOUT = 1500L; class TezClientForTest extends TezClient { TezYarnClient mockTezYarnClient; @@ -119,6 +121,7 @@ public class TestTezClient { } conf.setBoolean(TezConfiguration.TEZ_IGNORE_LIB_URIS, true); conf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); + conf.setLong(TezConfiguration.TEZ_CLIENT_HARD_KILL_TIMEOUT_MS, HARD_KILL_TIMEOUT); TezClientForTest client = new TezClientForTest("test", conf, lrs, null); ApplicationId appId1 = ApplicationId.newInstance(0, 1); @@ -429,4 +432,38 @@ public class TestTezClient { } } + @Test(timeout = 5000) + public void testStopRetriesUntilTerminalState() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false); + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.NEW).thenReturn(YarnApplicationState.KILLED); + try { + client.stop(); + } catch (Exception e) { + Assert.fail("Expected ApplicationNotFoundException"); + } + verify(client.mockYarnClient, atLeast(2)).getApplicationReport(client.mockAppId); + } + + @Test(timeout = 5000) + public void testStopRetriesUntilTimeout() throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setBoolean(TezConfiguration.TEZ_CLIENT_ASYNCHRONOUS_STOP, false); + final TezClientForTest client = configureAndCreateTezClient(conf); + client.start(); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.NEW); + long start = System.currentTimeMillis(); + try { + client.stop(); + } catch (Exception e) { + Assert.fail("Stop should complete without exception: " + e); + } + long end = System.currentTimeMillis(); + verify(client.mockYarnClient, atLeast(2)).getApplicationReport(client.mockAppId); + Assert.assertTrue("Stop ended before timeout", end - start > HARD_KILL_TIMEOUT); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/fb8fbd59/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 691ee02..9bf0819 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1183,12 +1183,7 @@ public class DAGAppMaster extends AbstractService { //send a DAG_KILL message LOG.info("Sending a kill event to the current DAG" + ", dagId=" + currentDAG.getID()); - try { - logDAGKillRequestEvent(currentDAG.getID(), true); - } catch (IOException e) { - throw new TezException(e); - } - sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL)); + tryKillDAG(currentDAG); } else { LOG.info("No current running DAG, shutting down the AM"); if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
