TEZ-2284. Separate TaskReporter into an interface. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bedadc7b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bedadc7b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bedadc7b Branch: refs/heads/master Commit: bedadc7b4074c8241faf5377515ad284417f15f9 Parents: 35aaa5a Author: Siddharth Seth <[email protected]> Authored: Tue Apr 7 13:21:35 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:13:54 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../internals/api/TaskReporterInterface.java | 46 ++++++++++++++++++++ .../apache/tez/runtime/task/TaskReporter.java | 12 ++++- .../org/apache/tez/runtime/task/TezChild.java | 3 +- .../apache/tez/runtime/task/TezTaskRunner.java | 5 ++- 5 files changed, 62 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bedadc7b/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 6a4399c..e2c428d 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -13,5 +13,6 @@ ALL CHANGES: TEZ-2187. Allow TaskCommunicators to report failed / killed attempts. TEZ-2241. Miscellaneous fixes after last reabse. TEZ-2283. Fixes after rebase 04/07. + TEZ-2284. Separate TaskReporter into an interface. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/bedadc7b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java new file mode 100644 index 0000000..47a61ab --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.internals.api; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.task.ErrorReporter; + +public interface TaskReporterInterface { + + // TODO TEZ-2003 Consolidate private API usage if making this public + + void registerTask(RuntimeTask task, ErrorReporter errorReporter); + + void unregisterTask(TezTaskAttemptID taskAttemptId); + + boolean taskSucceeded(TezTaskAttemptID taskAttemptId) throws IOException, TezException; + + boolean taskFailed(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics, EventMetaData srcMeta) throws IOException, + TezException; + + void addEvents(TezTaskAttemptID taskAttemptId, Collection<TezEvent> events); + + boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; + + void shutdown(); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/bedadc7b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index d9a7786..3579e3f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -48,6 +48,7 @@ import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * retrieve events specific to this task. * */ -public class TaskReporter { +public class TaskReporter implements TaskReporterInterface { private static final Logger LOG = LoggerFactory.getLogger(TaskReporter.class); @@ -98,6 +99,7 @@ public class TaskReporter { /** * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc. */ + @Override public synchronized void registerTask(RuntimeTask task, ErrorReporter errorReporter) { currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, @@ -110,12 +112,14 @@ public class TaskReporter { * This method should always be invoked before setting up heartbeats for another task running in * the same container. */ + @Override public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { currentCallable.markComplete(); currentCallable = null; // KKK Make sure the callable completes before proceeding } - + + @Override public void shutdown() { heartbeatExecutor.shutdownNow(); } @@ -413,19 +417,23 @@ public class TaskReporter { } } + @Override public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { return currentCallable.taskSucceeded(taskAttemptID); } + @Override public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, EventMetaData srcMeta) throws IOException, TezException { return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); } + @Override public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { currentCallable.addEvents(taskAttemptID, events); } + @Override public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { return umbilical.canCommit(taskAttemptID); } http://git-wip-us.apache.org/repos/asf/tez/blob/bedadc7b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 32da8fb..4c8bebc 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -69,6 +69,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +115,7 @@ public class TezChild { private final boolean ownUmbilical; private final TezTaskUmbilicalProtocol umbilical; - private TaskReporter taskReporter; + private TaskReporterInterface taskReporter; private int taskCount = 0; private TezVertexID lastVertexID; http://git-wip-us.apache.org/repos/asf/tez/blob/bedadc7b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index f54814b..33a7f4a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -41,6 +41,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final LogicalIOProcessorRuntimeTask task; private final UserGroupInformation ugi; - private final TaskReporter taskReporter; + private final TaskReporterInterface taskReporter; private final ListeningExecutorService executor; private volatile ListenableFuture<Void> taskFuture; private volatile Thread waitingThread; @@ -70,7 +71,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, TaskSpec taskSpec, int appAttemptNumber, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap, - Multimap<String, String> startedInputsMap, TaskReporter taskReporter, + Multimap<String, String> startedInputsMap, TaskReporterInterface taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry, String pid, ExecutionContext executionContext, long memAvailable) throws IOException {
