Repository: hive Updated Branches: refs/heads/branch-2.2 2e310cf39 -> 26c175a1c
HIVE-17576: Improve progress-reporting in TezProcessor (Mithun Radhakrishnan, reviewed by Owen O'Malley) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26c175a1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26c175a1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26c175a1 Branch: refs/heads/branch-2.2 Commit: 26c175a1c5b6bac12d3e92549febf06252a0f1c5 Parents: 2e310cf Author: Mithun Radhakrishnan <[email protected]> Authored: Mon Oct 9 10:39:24 2017 -0700 Committer: Mithun Radhakrishnan <[email protected]> Committed: Mon Oct 9 10:40:30 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/tez/TezProcessor.java | 66 ++++++++++++++++++++ 1 file changed, 66 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/26c175a1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 486d43a..d59d484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -66,7 +66,64 @@ public class TezProcessor extends AbstractLogicalIOProcessor { private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + // TODO: Replace with direct call to ProgressHelper, when reliably available. + private static class ReflectiveProgressHelper { + + Configuration conf; + Class<?> progressHelperClass = null; + Object progressHelper = null; + + ReflectiveProgressHelper(Configuration conf, + Map<String, LogicalInput> inputs, + ProcessorContext processorContext, + String processorName) { + this.conf = conf; + try { + progressHelperClass = this.conf.getClassByName("org.apache.tez.common.ProgressHelper"); + progressHelper = progressHelperClass.getDeclaredConstructor(Map.class, ProcessorContext.class, String.class) + .newInstance(inputs, processorContext, processorName); + LOG.debug("ProgressHelper initialized!"); + } + catch(Exception ex) { + LOG.warn("Could not find ProgressHelper. " + ex); + } + } + + private boolean isValid() { + return progressHelperClass != null && progressHelper != null; + } + + void scheduleProgressTaskService(long delay, long period) { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("scheduleProgressTaskService", long.class, long.class) + .invoke(progressHelper, delay, period); + LOG.debug("scheduleProgressTaskService() called!"); + } catch (Exception exception) { + LOG.warn("Could not scheduleProgressTaskService.", exception); + } + } + + void shutDownProgressTaskService() { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("shutDownProgressTaskService").invoke(progressHelper); + LOG.debug("shutDownProgressTaskService() called!"); + } + catch (Exception exception) { + LOG.warn("Could not shutDownProgressTaskService.", exception); + } + } + } + protected ProcessorContext processorContext; + private ReflectiveProgressHelper progressHelper; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -87,6 +144,9 @@ public class TezProcessor extends AbstractLogicalIOProcessor { // we have to close in the processor's run method, because tez closes inputs // before calling close (TEZ-955) and we might need to read inputs // when we flush the pipeline. + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -154,6 +214,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor { if (aborted.get()) { return; } + + // leverage TEZ-3437: Improve synchronization and the progress report behavior. + progressHelper = new ReflectiveProgressHelper(jobConf, inputs, getContext(), this.getClass().getSimpleName()); + + // There should be no blocking operation in RecordProcessor creation, // otherwise the abort operation will not register since they are synchronized on the same // lock. @@ -164,6 +229,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } } + progressHelper.scheduleProgressTaskService(0, 100); if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); }
