TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aec3f6b6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aec3f6b6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aec3f6b6 Branch: refs/heads/TEZ-2003 Commit: aec3f6b668a5d3f1adec58a09cef2d87c4e16df7 Parents: 3bfe003 Author: Siddharth Seth <[email protected]> Authored: Thu Apr 2 00:18:06 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Apr 2 00:18:06 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../src/main/java/org/apache/tez/runtime/task/TezChild.java | 2 +- .../java/org/apache/tez/runtime/task/TezTaskRunner.java | 9 ++++----- .../java/org/apache/tez/runtime/task/TestTaskExecution.java | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/aec3f6b6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 92abe79..f092bd0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar) ALL CHANGES: + TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late. + TEZ-2149. Optimizations for the timed version of DAGClient.getStatus. TEZ-2213. For the ordered case, enabling pipelined shuffle should automatically disable final merge. TEZ-2204. TestAMRecovery increasingly flaky on jenkins builds. TEZ-2209. Fix pipelined shuffle to fetch data from any one attempt http://git-wip-us.apache.org/repos/asf/tez/blob/aec3f6b6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index bd2d025..bfec349 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -235,7 +235,7 @@ public class TezChild { // Execute the Actual Task TezTaskRunner taskRunner = new TezTaskRunner(defaultConf, childUGI, - localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber, + localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memAvailable); boolean shouldDie; http://git-wip-us.apache.org/repos/asf/tez/blob/aec3f6b6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index fd920e4..766b5c3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; -import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; @@ -70,7 +68,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, - TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber, + TaskSpec taskSpec, int appAttemptNumber, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap, Multimap<String, String> startedInputsMap, TaskReporter taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid, @@ -83,8 +81,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid, executionContext, memAvailable); - taskReporter.registerTask(task, this); - taskRunning = new AtomicBoolean(true); + taskRunning = new AtomicBoolean(false); } /** @@ -94,6 +91,8 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { */ public boolean run() throws InterruptedException, IOException, TezException { waitingThread = Thread.currentThread(); + taskRunning.set(true); + taskReporter.registerTask(task, this); TaskRunnerCallable callable = new TaskRunnerCallable(); Throwable failureCause = null; taskFuture = executor.submit(callable); http://git-wip-us.apache.org/repos/asf/tez/blob/aec3f6b6/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java index 91b311f..1bcb337 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java @@ -716,7 +716,7 @@ public class TestTaskExecution { TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor, new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null); - TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1, + TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, 1, new HashMap<String, ByteBuffer>(), new HashMap<String, String>(), HashMultimap.<String, String> create(), taskReporter, executor, null, "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory()); return taskRunner;
