Repository: tez Updated Branches: refs/heads/branch-0.9 15a1f9424 -> a97a047c6
TEZ-3951. TezClient wait too long for the DAGClient for prewarm; tries to shut down the wrong DAG (Sergey Shelukhin via Harish Jaiprakash) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a97a047c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a97a047c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a97a047c Branch: refs/heads/branch-0.9 Commit: a97a047c61f7c0266d612a95eb4f8972b6f74be1 Parents: 15a1f94 Author: Harish JP <[email protected]> Authored: Mon Jun 11 18:57:41 2018 +0530 Committer: Harish JP <[email protected]> Committed: Mon Jun 11 18:57:41 2018 +0530 ---------------------------------------------------------------------- .../java/org/apache/tez/client/TezClient.java | 38 ++++++++++++++------ .../apache/tez/dag/api/client/DAGClient.java | 14 +++++++- .../tez/dag/api/client/DAGClientImpl.java | 33 +++++++++++++---- .../org/apache/tez/client/TestTezClient.java | 29 +++++++++++++++ .../apache/tez/dag/api/client/MRDAGClient.java | 5 +++ 5 files changed, 101 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a97a047c/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 9dd4a69..ad00592 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -112,6 +112,7 @@ public class TezClient { private static final String appIdStrPrefix = "application"; private static final String APPLICATION_ID_PREFIX = appIdStrPrefix + '_'; + private static final long PREWARM_WAIT_MS = 500; @VisibleForTesting static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found."; @@ -584,23 +585,33 @@ public class TezClient { * if submission timed out */ public synchronized DAGClient submitDAG(DAG dag) throws TezException, IOException { - if (isSession) { - return submitDAGSession(dag); - } else { - return submitDAGApplication(dag); + DAGClient result = isSession ? submitDAGSession(dag) : submitDAGApplication(dag); + if (result != null) { + closePrewarmDagClient(); // Assume the current DAG replaced the prewarm one; no need to kill. } + return result; } - private void closePrewarmDagClient() { + private void killAndClosePrewarmDagClient(long waitTimeMs) { if (prewarmDagClient == null) { return; } try { - prewarmDagClient.tryKillDAG(); - LOG.info("Waiting for prewarm DAG to shut down"); - prewarmDagClient.waitForCompletion(); - } catch (Exception ex) { - LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex); + prewarmDagClient.tryKillDAG(); + if (waitTimeMs > 0) { + LOG.info("Waiting for prewarm DAG to shut down"); + prewarmDagClient.waitForCompletion(waitTimeMs); + } + } + catch (Exception ex) { + LOG.warn("Failed to shut down the prewarm DAG " + prewarmDagClient, ex); + } + closePrewarmDagClient(); + } + + private void closePrewarmDagClient() { + if (prewarmDagClient == null) { + return; } try { prewarmDagClient.close(); @@ -705,6 +716,11 @@ public class TezClient { frameworkClient); } + @VisibleForTesting + protected long getPrewarmWaitTimeMs() { + return PREWARM_WAIT_MS; + } + /** * Stop the client. This terminates the connection to the YARN cluster. * In session mode, this shuts down the session DAG App Master @@ -712,7 +728,7 @@ public class TezClient { * @throws IOException */ public synchronized void stop() throws TezException, IOException { - closePrewarmDagClient(); + killAndClosePrewarmDagClient(getPrewarmWaitTimeMs()); try { if (amKeepAliveService != null) { amKeepAliveService.shutdownNow(); http://git-wip-us.apache.org/repos/asf/tez/blob/a97a047c/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index c70da75..6c0ebbd 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -103,7 +103,7 @@ public abstract class DAGClient implements Closeable { public abstract void tryKillDAG() throws IOException, TezException; /** - * Wait for DAG to complete without printing any vertex statuses + * Wait forever for DAG to complete without printing any vertex statuses * * @return Final DAG Status * @throws IOException @@ -113,6 +113,17 @@ public abstract class DAGClient implements Closeable { public abstract DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException; /** + * Wait for DAG to complete without printing any vertex statuses + * + * @param timeMs Maximum wait duration + * @return Final DAG Status, or null on timeout or if DAG is no longer running + * @throws IOException + * @throws TezException + * @throws InterruptedException + */ + public abstract DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException; + + /** * Wait for DAG to complete and periodically print *all* vertices' status. * * @param statusGetOpts @@ -125,4 +136,5 @@ public abstract class DAGClient implements Closeable { */ public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException; + } http://git-wip-us.apache.org/repos/asf/tez/blob/a97a047c/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 1cf0bfc..9e17b9b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.api.client; import javax.annotation.Nullable; + import java.io.IOException; import java.text.DecimalFormat; import java.util.Collections; @@ -28,7 +29,6 @@ import java.util.Map; import java.util.Set; import com.google.common.annotations.VisibleForTesting; - import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; @@ -338,15 +338,20 @@ public class DAGClientImpl extends DAGClient { } @Override + public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException { + return _waitForCompletionWithStatusUpdates(timeMs, false, EnumSet.noneOf(StatusGetOpts.class)); + } + + @Override public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException { - return _waitForCompletionWithStatusUpdates(false, EnumSet.noneOf(StatusGetOpts.class)); + return _waitForCompletionWithStatusUpdates(-1, false, EnumSet.noneOf(StatusGetOpts.class)); } @Override public DAGStatus waitForCompletionWithStatusUpdates( @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException { - return _waitForCompletionWithStatusUpdates(true, statusGetOpts); + return _waitForCompletionWithStatusUpdates(-1, true, statusGetOpts); } @Override @@ -504,15 +509,21 @@ public class DAGClientImpl extends DAGClient { return dagStatus; } - private DAGStatus _waitForCompletionWithStatusUpdates(boolean vertexUpdates, + private DAGStatus _waitForCompletionWithStatusUpdates(long timeMs, + boolean vertexUpdates, @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException { DAGStatus dagStatus; boolean initPrinted = false; boolean runningPrinted = false; double dagProgress = -1.0; // Print the first one // monitoring + Long maxNs = timeMs >= 0 ? (System.nanoTime() + (timeMs * 1000000L)) : null; while (true) { - dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); + try { + dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); + } catch (DAGNotRunningException ex) { + return null; + } if (!initPrinted && (dagStatus.getState() == DAGStatus.State.INITING || dagStatus.getState() == DAGStatus.State.SUBMITTED)) { initPrinted = true; // Print once @@ -525,6 +536,9 @@ public class DAGClientImpl extends DAGClient { || dagStatus.getState() == DAGStatus.State.ERROR) { break; } + if (maxNs != null && System.nanoTime() > maxNs) { + return null; + } }// End of while(true) Set<String> vertexNames = Collections.emptySet(); @@ -537,7 +551,14 @@ public class DAGClientImpl extends DAGClient { vertexNames = getDAGStatus(statusGetOpts).getVertexProgress().keySet(); } dagProgress = monitorProgress(vertexNames, dagProgress, null, dagStatus); - dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); + try { + dagStatus = getDAGStatus(statusGetOpts, SLEEP_FOR_COMPLETION); + } catch (DAGNotRunningException ex) { + return null; + } + if (maxNs != null && System.nanoTime() > maxNs) { + return null; + } }// end of while // Always print the last status irrespective of progress change monitorProgress(vertexNames, -1.0, statusGetOpts, dagStatus); http://git-wip-us.apache.org/repos/asf/tez/blob/a97a047c/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index e959a55..0c297d3 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -115,6 +115,7 @@ public class TestTezClient { YarnClient mockYarnClient; ApplicationId mockAppId; boolean callRealGetSessionAMProxy; + Long prewarmTimeoutMs; public TezClientForTest(String name, TezConfiguration tezConf, @Nullable Map<String, LocalResource> localResources, @@ -135,6 +136,15 @@ public class TestTezClient { } return super.getAMProxy(appId); } + + public void setPrewarmTimeoutMs(Long prewarmTimeoutMs) { + this.prewarmTimeoutMs = prewarmTimeoutMs; + } + + @Override + protected long getPrewarmWaitTimeMs() { + return prewarmTimeoutMs == null ? super.getPrewarmWaitTimeMs() : prewarmTimeoutMs; + } } TezClientForTest configureAndCreateTezClient() throws YarnException, IOException, ServiceException { @@ -429,6 +439,25 @@ public class TestTezClient { client.stop(); } + + @Test (timeout=5000) + public void testPreWarmCloseStuck() throws Exception { + TezClientForTest client = configureAndCreateTezClient(); + client.setPrewarmTimeoutMs(10L); // Don't wait too long. + client.start(); + + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.RUNNING); + when(client.sessionAmProxy.getAMStatus((RpcController) any(), (GetAMStatusRequestProto) any())) + .thenReturn(GetAMStatusResponseProto.newBuilder().setStatus(TezAppMasterStatusProto.READY).build()); + + PreWarmVertex vertex = PreWarmVertex.create("PreWarm", 1, Resource.newInstance(1, 1)); + client.preWarm(vertex); + // Keep prewarm in "running" state. Client should give up waiting; if it doesn't, the test will time out. + client.stop(); + } + + private void setClientToReportStoppedDags(TezClientForTest client) throws Exception { when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) .thenReturn(YarnApplicationState.FINISHED); http://git-wip-us.apache.org/repos/asf/tez/blob/a97a047c/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java index 42b52e0..16dc2f8 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java @@ -86,6 +86,11 @@ public class MRDAGClient extends DAGClient { } @Override + public DAGStatus waitForCompletion(long timeMs) throws IOException, TezException, InterruptedException { + return realClient.waitForCompletion(timeMs); + } + + @Override public DAGStatus waitForCompletionWithStatusUpdates( @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException { return realClient.waitForCompletionWithStatusUpdates(statusGetOpts);
