Repository: hive
Updated Branches:
  refs/heads/master f5ec4b556 -> 139d8d0b1


HIVE-17576: Improve progress-reporting in TezProcessor (Mithun Radhakrishnan, 
reviewed by Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/139d8d0b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/139d8d0b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/139d8d0b

Branch: refs/heads/master
Commit: 139d8d0b1e3496b9ed53300790c408c472b0d76a
Parents: f5ec4b5
Author: Mithun RK <[email protected]>
Authored: Mon Oct 9 10:38:49 2017 -0700
Committer: Mithun RK <[email protected]>
Committed: Mon Oct 9 10:38:49 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 67 +++++++++++++++++++-
 1 file changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/139d8d0b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 4242262..00b3486 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +69,64 @@ public class TezProcessor extends AbstractLogicalIOProcessor 
{
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
 
+  // TODO: Replace with direct call to ProgressHelper, when reliably available.
+  private static class ReflectiveProgressHelper {
+
+    Configuration conf;
+    Class<?> progressHelperClass = null;
+    Object progressHelper = null;
+
+    ReflectiveProgressHelper(Configuration conf,
+                             Map<String, LogicalInput> inputs,
+                             ProcessorContext processorContext,
+                             String processorName) {
+      this.conf = conf;
+      try {
+        progressHelperClass = 
this.conf.getClassByName("org.apache.tez.common.ProgressHelper");
+        progressHelper = progressHelperClass.getDeclaredConstructor(Map.class, 
ProcessorContext.class, String.class)
+                            .newInstance(inputs, processorContext, 
processorName);
+        LOG.debug("ProgressHelper initialized!");
+      }
+      catch(Exception ex) {
+        LOG.warn("Could not find ProgressHelper. " + ex);
+      }
+    }
+
+    private boolean isValid() {
+      return progressHelperClass != null && progressHelper != null;
+    }
+
+    void scheduleProgressTaskService(long delay, long period) {
+      if (!isValid()) {
+        LOG.warn("ProgressHelper uninitialized. Bailing on 
scheduleProgressTaskService()");
+        return;
+      }
+      try {
+        progressHelperClass.getDeclaredMethod("scheduleProgressTaskService", 
long.class, long.class)
+            .invoke(progressHelper, delay, period);
+        LOG.debug("scheduleProgressTaskService() called!");
+      } catch (Exception exception) {
+        LOG.warn("Could not scheduleProgressTaskService.", exception);
+      }
+    }
+
+    void shutDownProgressTaskService() {
+      if (!isValid()) {
+        LOG.warn("ProgressHelper uninitialized. Bailing on 
scheduleProgressTaskService()");
+        return;
+      }
+      try {
+        
progressHelperClass.getDeclaredMethod("shutDownProgressTaskService").invoke(progressHelper);
+        LOG.debug("shutDownProgressTaskService() called!");
+      }
+      catch (Exception exception) {
+        LOG.warn("Could not shutDownProgressTaskService.", exception);
+      }
+    }
+  }
+
   protected ProcessorContext processorContext;
+  private ReflectiveProgressHelper progressHelper;
 
   protected static final NumberFormat taskIdFormat = 
NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
@@ -91,6 +147,9 @@ public class TezProcessor extends AbstractLogicalIOProcessor 
{
     // we have to close in the processor's run method, because tez closes 
inputs
     // before calling close (TEZ-955) and we might need to read inputs
     // when we flush the pipeline.
+      if (progressHelper != null) {
+        progressHelper.shutDownProgressTaskService();
+      }
   }
 
   @Override
@@ -158,6 +217,11 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
       if (aborted.get()) {
         return;
       }
+
+      // leverage TEZ-3437: Improve synchronization and the progress report 
behavior.
+      progressHelper = new ReflectiveProgressHelper(jobConf, inputs, 
getContext(), this.getClass().getSimpleName());
+
+
       // There should be no blocking operation in RecordProcessor creation,
       // otherwise the abort operation will not register since they are 
synchronized on the same
       // lock.
@@ -168,6 +232,7 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
       }
     }
 
+    progressHelper.scheduleProgressTaskService(0, 100);
     if (!aborted.get()) {
       initializeAndRunProcessor(inputs, outputs);
     }

Reply via email to