Repository: hive
Updated Branches:
  refs/heads/master 6bfa2491b -> 538c0088a


HIVE-16635: Progressbar: Use different timeouts for running queries (Gopal V, 
reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/40fe0d7e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/40fe0d7e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/40fe0d7e

Branch: refs/heads/master
Commit: 40fe0d7e03a03bf6082ff00f17322756e6f00ea9
Parents: 6bfa249
Author: Gopal V <gop...@apache.org>
Authored: Fri May 12 00:07:22 2017 -0700
Committer: Gopal V <gop...@apache.org>
Committed: Fri May 12 00:07:22 2017 -0700

----------------------------------------------------------------------
 .../ql/exec/tez/monitoring/TezJobMonitor.java   | 30 +++++++++++++++++---
 1 file changed, 26 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/40fe0d7e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index f2f97f3..049d7fd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.Progress;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.util.StopWatch;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -47,6 +49,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 
@@ -58,8 +61,10 @@ import static 
org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 public class TezJobMonitor {
 
   static final String CLASS_NAME = TezJobMonitor.class.getName();
-  private static final int CHECK_INTERVAL = 200;
+  private static final int MIN_CHECK_INTERVAL = 200;
+  private static final int MAX_CHECK_INTERVAL = 1000;
   private static final int MAX_RETRY_INTERVAL = 2500;
+  private static final int MAX_RETRY_FAILURES = (MAX_RETRY_INTERVAL / 
MAX_CHECK_INTERVAL) + 1;
 
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final List<DAGClient> shutdownList;
@@ -124,6 +129,7 @@ public class TezJobMonitor {
     boolean done = false;
     boolean success = false;
     int failedCounter = 0;
+    final StopWatch failureTimer = new StopWatch();
     int rc = 0;
     DAGStatus status = null;
     Map<String, Progress> vertexProgressMap = null;
@@ -138,6 +144,7 @@ public class TezJobMonitor {
     DAGStatus.State lastState = null;
     boolean running = false;
 
+    int checkInterval = MIN_CHECK_INTERVAL;
     while (true) {
 
       try {
@@ -145,10 +152,13 @@ public class TezJobMonitor {
           context.checkHeartbeaterLockException();
         }
 
-        status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), 
CHECK_INTERVAL);
+        status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), 
checkInterval);
         vertexProgressMap = status.getVertexProgress();
         DAGStatus.State state = status.getState();
 
+        failedCounter = 0; // AM is responsive again (recovery?)
+        failureTimer.reset();
+
         if (state != lastState || state == RUNNING) {
           lastState = state;
 
@@ -166,6 +176,8 @@ public class TezJobMonitor {
                 console.printInfo("Status: Running (" + 
dagClient.getExecutionContext() + ")\n");
                 this.executionStartTime = System.currentTimeMillis();
                 running = true;
+                // from running -> failed/succeeded, the AM breaks out of 
timeouts
+                checkInterval = MAX_CHECK_INTERVAL;
               }
               updateFunction.update(status, vertexProgressMap);
               break;
@@ -204,9 +216,19 @@ public class TezJobMonitor {
       } catch (Exception e) {
         console.printInfo("Exception: " + e.getMessage());
         boolean isInterrupted = hasInterruptedException(e);
-        if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / 
CHECK_INTERVAL == 0)) {
+        if (failedCounter == 0) {
+          failureTimer.reset();
+          failureTimer.start();
+        }
+        if (isInterrupted
+            || (++failedCounter >= MAX_RETRY_FAILURES && 
failureTimer.now(TimeUnit.MILLISECONDS) > MAX_RETRY_INTERVAL)) {
           try {
-            console.printInfo("Killing DAG...");
+            if (isInterrupted) {
+              console.printInfo("Killing DAG...");
+            } else {
+              console.printInfo(String.format("Killing DAG... after %d 
seconds",
+                  failureTimer.now(TimeUnit.SECONDS)));
+            }
             dagClient.tryKillDAG();
           } catch (IOException | TezException tezException) {
             // best effort

Reply via email to