Repository: hive Updated Branches: refs/heads/master f5ec4b556 -> 139d8d0b1
HIVE-17576: Improve progress-reporting in TezProcessor (Mithun Radhakrishnan, reviewed by Owen O'Malley) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/139d8d0b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/139d8d0b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/139d8d0b Branch: refs/heads/master Commit: 139d8d0b1e3496b9ed53300790c408c472b0d76a Parents: f5ec4b5 Author: Mithun RK <[email protected]> Authored: Mon Oct 9 10:38:49 2017 -0700 Committer: Mithun RK <[email protected]> Committed: Mon Oct 9 10:38:49 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/tez/TezProcessor.java | 67 +++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/139d8d0b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 4242262..00b3486 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +69,64 @@ public class TezProcessor extends AbstractLogicalIOProcessor { private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + // TODO: Replace with direct call to ProgressHelper, when reliably available. + private static class ReflectiveProgressHelper { + + Configuration conf; + Class<?> progressHelperClass = null; + Object progressHelper = null; + + ReflectiveProgressHelper(Configuration conf, + Map<String, LogicalInput> inputs, + ProcessorContext processorContext, + String processorName) { + this.conf = conf; + try { + progressHelperClass = this.conf.getClassByName("org.apache.tez.common.ProgressHelper"); + progressHelper = progressHelperClass.getDeclaredConstructor(Map.class, ProcessorContext.class, String.class) + .newInstance(inputs, processorContext, processorName); + LOG.debug("ProgressHelper initialized!"); + } + catch(Exception ex) { + LOG.warn("Could not find ProgressHelper. " + ex); + } + } + + private boolean isValid() { + return progressHelperClass != null && progressHelper != null; + } + + void scheduleProgressTaskService(long delay, long period) { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("scheduleProgressTaskService", long.class, long.class) + .invoke(progressHelper, delay, period); + LOG.debug("scheduleProgressTaskService() called!"); + } catch (Exception exception) { + LOG.warn("Could not scheduleProgressTaskService.", exception); + } + } + + void shutDownProgressTaskService() { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("shutDownProgressTaskService").invoke(progressHelper); + LOG.debug("shutDownProgressTaskService() called!"); + } + catch (Exception exception) { + LOG.warn("Could not shutDownProgressTaskService.", exception); + } + } + } + protected ProcessorContext processorContext; + private ReflectiveProgressHelper progressHelper; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -91,6 +147,9 @@ public class TezProcessor extends AbstractLogicalIOProcessor { // we have to close in the processor's run method, because tez closes inputs // before calling close (TEZ-955) and we might need to read inputs // when we flush the pipeline. + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -158,6 +217,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor { if (aborted.get()) { return; } + + // leverage TEZ-3437: Improve synchronization and the progress report behavior. + progressHelper = new ReflectiveProgressHelper(jobConf, inputs, getContext(), this.getClass().getSimpleName()); + + // There should be no blocking operation in RecordProcessor creation, // otherwise the abort operation will not register since they are synchronized on the same // lock. @@ -168,6 +232,7 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } } + progressHelper.scheduleProgressTaskService(0, 100); if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); }
