Repository: hive Updated Branches: refs/heads/master 6bfa2491b -> 538c0088a
HIVE-16635: Progressbar: Use different timeouts for running queries (Gopal V, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40fe0d7e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40fe0d7e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40fe0d7e Branch: refs/heads/master Commit: 40fe0d7e03a03bf6082ff00f17322756e6f00ea9 Parents: 6bfa249 Author: Gopal V <gop...@apache.org> Authored: Fri May 12 00:07:22 2017 -0700 Committer: Gopal V <gop...@apache.org> Committed: Fri May 12 00:07:22 2017 -0700 ---------------------------------------------------------------------- .../ql/exec/tez/monitoring/TezJobMonitor.java | 30 +++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/40fe0d7e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index f2f97f3..049d7fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.tez.monitoring; import com.google.common.base.Preconditions; + import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -39,6 +40,7 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.util.StopWatch; import java.io.IOException; import java.io.InterruptedIOException; @@ -47,6 +49,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -58,8 +61,10 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; public class TezJobMonitor { static final String CLASS_NAME = TezJobMonitor.class.getName(); - private static final int CHECK_INTERVAL = 200; + private static final int MIN_CHECK_INTERVAL = 200; + private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; + private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / MAX_CHECK_INTERVAL) + 1; private final PerfLogger perfLogger = SessionState.getPerfLogger(); private static final List<DAGClient> shutdownList; @@ -124,6 +129,7 @@ public class TezJobMonitor { boolean done = false; boolean success = false; int failedCounter = 0; + final StopWatch failureTimer = new StopWatch(); int rc = 0; DAGStatus status = null; Map<String, Progress> vertexProgressMap = null; @@ -138,6 +144,7 @@ public class TezJobMonitor { DAGStatus.State lastState = null; boolean running = false; + int checkInterval = MIN_CHECK_INTERVAL; while (true) { try { @@ -145,10 +152,13 @@ public class TezJobMonitor { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), CHECK_INTERVAL); + status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), checkInterval); vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); + failedCounter = 0; // AM is responsive again (recovery?) + failureTimer.reset(); + if (state != lastState || state == RUNNING) { lastState = state; @@ -166,6 +176,8 @@ public class TezJobMonitor { console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n"); this.executionStartTime = System.currentTimeMillis(); running = true; + // from running -> failed/succeeded, the AM breaks out of timeouts + checkInterval = MAX_CHECK_INTERVAL; } updateFunction.update(status, vertexProgressMap); break; @@ -204,9 +216,19 @@ public class TezJobMonitor { } catch (Exception e) { console.printInfo("Exception: " + e.getMessage()); boolean isInterrupted = hasInterruptedException(e); - if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / CHECK_INTERVAL == 0)) { + if (failedCounter == 0) { + failureTimer.reset(); + failureTimer.start(); + } + if (isInterrupted + || (++failedCounter >= MAX_RETRY_FAILURES && failureTimer.now(TimeUnit.MILLISECONDS) > MAX_RETRY_INTERVAL)) { try { - console.printInfo("Killing DAG..."); + if (isInterrupted) { + console.printInfo("Killing DAG..."); + } else { + console.printInfo(String.format("Killing DAG... after %d seconds", + failureTimer.now(TimeUnit.SECONDS))); + } dagClient.tryKillDAG(); } catch (IOException | TezException tezException) { // best effort