Repository: tez Updated Branches: refs/heads/master a99867786 -> 4675a651f
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4675a651 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4675a651 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4675a651 Branch: refs/heads/master Commit: 4675a651f874c223cb5968ec39704bc9ba973507 Parents: a998677 Author: Jonathan Eagles <[email protected]> Authored: Thu Apr 14 15:43:53 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Apr 14 15:44:05 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/tez/dag/api/TezConfiguration.java | 22 ++++++++++++++++++++ .../runtime/LogicalIOProcessorRuntimeTask.java | 21 ++++++++++++++++--- 3 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 46ad9b6..2ed4091 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: TEZ-3202. Reduce the memory need for jobs with high number of segments + TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs Release 0.8.3: 2016-04-14 @@ -434,6 +435,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs TEZ-3202. Reduce the memory need for jobs with high number of segments TEZ-3188. Move tez.submit.hosts out of TezConfiguration to TezConfigurationConstants. TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 0bbe1df..6785405 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -731,6 +731,28 @@ public class TezConfiguration extends Configuration { public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000; /** + * Boolean value. Backwards compatibility setting for initializing IO processor before + * inputs and outputs. + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_TASK_INITIALIZE_PROCESSOR_FIRST = TEZ_TASK_PREFIX + + "initialize-processor-first"; + public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT = false; + + /** + * Boolean value. Backwards compatibility setting for initializing inputs and outputs + * serially instead of the parallel default. + * Expert level setting. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY = TEZ_TASK_PREFIX + + "initialize-processor-io-serially"; + public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false; + + /** * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output * components need to make successive progress notifications. If the progress is not notified * for this interval then the task will be considered hung and terminated. http://git-wip-us.apache.org/repos/asf/tez/blob/4675a651/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index a31136b..0863e65 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -156,6 +156,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final HadoopShim hadoopShim; private final int maxEventBacklog; + private final boolean initializeProcessorFirst; + private final boolean initializeProcessorIOSerially; + public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> envMap, @@ -189,8 +192,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>(); this.state.set(State.NEW); this.appAttemptNumber = appAttemptNumber; + this.initializeProcessorFirst = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST, + TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT); + this.initializeProcessorIOSerially = tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY, + TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT); int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread. numInitializers = (numInitializers == 0 ? 1 : numInitializers); + if (initializeProcessorIOSerially) { + numInitializers = 1; + } this.initializerExecutor = Executors.newFixedThreadPool( numInitializers, new ThreadFactoryBuilder().setDaemon(true) @@ -219,6 +229,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.processorContext = createProcessorContext(); this.processor = createProcessor(processorDescriptor.getClassName(), processorContext); + if (initializeProcessorFirst || initializeProcessorIOSerially) { + // Initialize processor in the current thread. + initializeLogicalIOProcessor(); + } int numTasks = 0; int inputIndex = 0; @@ -235,9 +249,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { numTasks++; } - // Initialize processor in the current thread. - initializeLogicalIOProcessor(); - + if (!(initializeProcessorFirst || initializeProcessorIOSerially)) { + // Initialize processor in the current thread. + initializeLogicalIOProcessor(); + } int completedTasks = 0; while (completedTasks < numTasks) { LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
