Repository: tez
Updated Branches:
  refs/heads/branch-0.7 714d8c432 -> 33351895e


TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor 
initialization relative to Inputs/Outputs (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/33351895
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/33351895
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/33351895

Branch: refs/heads/branch-0.7
Commit: 33351895ea57e50396a53575bec0b400cc801721
Parents: 714d8c4
Author: Jonathan Eagles <[email protected]>
Authored: Thu Apr 14 15:51:47 2016 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Thu Apr 14 15:51:47 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 20 +++++++++++++++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 21 +++++++++++++++++---
 3 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4033249..d5f35d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor 
initialization relative to Inputs/Outputs
   TEZ-3202. Reduce the memory need for jobs with high number of segments
   TEZ-3188. Move tez.submit.hosts out of TezConfiguration to 
TezConfigurationConstants.
   TEZ-3196. java.lang.InternalError from decompression codec is fatal to a 
task during shuffle

http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 896dfb6..96adff3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -679,6 +679,26 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT = 10000;
 
   /**
+   * Boolean value. Backwards compatibility setting for initializing IO 
processor before
+   * inputs and outputs.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_TASK_INITIALIZE_PROCESSOR_FIRST = 
TEZ_TASK_PREFIX +
+      "initialize-processor-first";
+  public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT = 
false;
+
+  /**
+   * Boolean value. Backwards compatibility setting for initializing inputs 
and outputs
+   * serially instead of the parallel default.
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY = 
TEZ_TASK_PREFIX +
+      "initialize-processor-io-serially";
+  public static final boolean 
TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false;
+
+  /**
    * Long value. Interval, in milliseconds, within which any of the tasks 
Input/Processor/Output 
    * components need to make successive progress notifications. If the 
progress is not notified 
    * for this interval then the task will be considered hung and terminated.

http://git-wip-us.apache.org/repos/asf/tez/blob/33351895/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 9b8daf6..11ef31d 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -150,6 +150,9 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
   private final long memAvailable;
   private final int maxEventBacklog;
 
+  private final boolean initializeProcessorFirst;
+  private final boolean initializeProcessorIOSerially;
+
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> 
envMap,
@@ -182,8 +185,15 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state.set(State.NEW);
     this.appAttemptNumber = appAttemptNumber;
+    this.initializeProcessorFirst = 
tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST,
+        TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_FIRST_DEFAULT);
+    this.initializeProcessorIOSerially = 
tezConf.getBoolean(TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY,
+        TezConfiguration.TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT);
     int numInitializers = numInputs + numOutputs; // Processor is initialized 
in the main thread.
     numInitializers = (numInitializers == 0 ? 1 : numInitializers);
+    if (initializeProcessorIOSerially) {
+      numInitializers = 1;
+    }
     this.initializerExecutor = Executors.newFixedThreadPool(
         numInitializers,
         new ThreadFactoryBuilder().setDaemon(true)
@@ -211,6 +221,10 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
     this.processorContext = createProcessorContext();
     this.processor = createProcessor(processorDescriptor.getClassName(), 
processorContext);
 
+    if (initializeProcessorFirst || initializeProcessorIOSerially) {
+      // Initialize processor in the current thread.
+      initializeLogicalIOProcessor();
+    }
     int numTasks = 0;
 
     int inputIndex = 0;
@@ -227,9 +241,10 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
       numTasks++;
     }
 
-    // Initialize processor in the current thread.
-    initializeLogicalIOProcessor();
-
+    if (!(initializeProcessorFirst || initializeProcessorIOSerially)) {
+      // Initialize processor in the current thread.
+      initializeLogicalIOProcessor();
+    }
     int completedTasks = 0;
     while (completedTasks < numTasks) {
       LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to 
finish");

Reply via email to