Repository: tez Updated Branches: refs/heads/branch-0.7 714d8c432 -> 33351895e
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/33351895 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/33351895 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/33351895 Branch: refs/heads/branch-0.7 Commit: 33351895ea57e50396a53575bec0b400cc801721 Parents: 714d8c4 Author: Jonathan Eagles <[email protected]> Authored: Thu Apr 14 15:51:47 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Apr 14 15:51:47 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 20 +++++++++++++++++++ .../runtime/LogicalIOProcessorRuntimeTask.java | 21 +++++++++++++++++--- 3 files changed, 39 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4033249..d5f35d8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs TEZ-3202. Reduce the memory need for jobs with high number of segments TEZ-3188. Move tez.submit.hosts out of TezConfiguration to TezConfigurationConstants. TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 896dfb6..96adff3 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -679,6 +679,26 @@ public class TezConfiguration extends Configuration { public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000; /** + * Boolean value. Backwards compatibility setting for initializing IO processor before + * inputs and outputs. + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_TASK_INITIALIZE_PROCESSOR_FIRST = TEZ_TASK_PREFIX + + "initialize-processor-first"; + public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT = false; + + /** + * Boolean value. Backwards compatibility setting for initializing inputs and outputs + * serially instead of the parallel default. + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + public static final String TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY = TEZ_TASK_PREFIX + + "initialize-processor-io-serially"; + public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false; + + /** * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output * components need to make successive progress notifications. If the progress is not notified * for this interval then the task will be considered hung and terminated. http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 9b8daf6..11ef31d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -150,6 +150,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final long memAvailable; private final int maxEventBacklog; + private final boolean initializeProcessorFirst; + private final boolean initializeProcessorIOSerially; + public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, @@ -182,8 +185,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>(); this.state.set(State.NEW); this.appAttemptNumber = appAttemptNumber; + this.initializeProcessorFirst = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST, + TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT); + this.initializeProcessorIOSerially = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY, + TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT); int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread. numInitializers = (numInitializers == 0 ? 1 : numInitializers); + if (initializeProcessorIOSerially) { + numInitializers = 1; + } this.initializerExecutor = Executors.newFixedThreadPool( numInitializers, new ThreadFactoryBuilder().setDaemon(true) @@ -211,6 +221,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.processorContext = createProcessorContext(); this.processor = createProcessor(processorDescriptor.getClassName(), processorContext); + if (initializeProcessorFirst || initializeProcessorIOSerially) { + // Initialize processor in the current thread. + initializeLogicalIOProcessor(); + } int numTasks = 0; int inputIndex = 0; @@ -227,9 +241,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { numTasks++; } - // Initialize processor in the current thread. - initializeLogicalIOProcessor(); - + if (!(initializeProcessorFirst || initializeProcessorIOSerially)) { + // Initialize processor in the current thread. + initializeLogicalIOProcessor(); + } int completedTasks = 0; while (completedTasks < numTasks) { LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
