TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b4ed5612 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b4ed5612 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b4ed5612 Branch: refs/heads/TEZ-2003 Commit: b4ed56123eb25d7007708cfbcd8406efd6f3f966 Parents: 08a196a Author: Siddharth Seth <[email protected]> Authored: Tue Apr 7 13:22:09 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Aug 6 01:25:10 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../apache/tez/dag/api/TaskCommunicatorContext.java | 4 ++++ .../tez/dag/app/TaskAttemptListenerImpTezDag.java | 10 ++++++++++ .../apache/tez/dag/app/TezTaskCommunicatorImpl.java | 16 +++++++++------- 4 files changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b4ed5612/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index e2c428d..9d6b220 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -14,5 +14,6 @@ ALL CHANGES: TEZ-2241. Miscellaneous fixes after last reabse. TEZ-2283. Fixes after rebase 04/07. TEZ-2284. Separate TaskReporter into an interface. + TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/b4ed5612/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index a85fb7f..0c3bac3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -43,6 +43,10 @@ public interface TaskCommunicatorContext { boolean isKnownContainer(ContainerId containerId); + void taskAlive(TezTaskAttemptID taskAttemptId); + + void containerAlive(ContainerId containerId); + // TODO TEZ-2003 Move to vertex, taskIndex, version. Rename to taskAttempt* void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId); http://git-wip-us.apache.org/repos/asf/tez/blob/b4ed5612/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 3798b6f..a6994d2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -258,6 +258,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override + public void taskAlive(TezTaskAttemptID taskAttemptId) { + taskHeartbeatHandler.pinged(taskAttemptId); + } + + @Override + public void containerAlive(ContainerId containerId) { + pingContainerHeartbeatHandler(containerId); + } + + @Override public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) { context.getEventHandler() .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null)); http://git-wip-us.apache.org/repos/asf/tez/blob/b4ed5612/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index bba06fd..a4a707b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -65,17 +65,19 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { null, true, null, null, false); private final TaskCommunicatorContext taskCommunicatorContext; + private final TezTaskUmbilicalProtocol taskUmbilical; - private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = + protected final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = new ConcurrentHashMap<ContainerId, ContainerInfo>(); - private final ConcurrentMap<TaskAttempt, ContainerId> attemptToContainerMap = + protected final ConcurrentMap<TaskAttempt, ContainerId> attemptToContainerMap = new ConcurrentHashMap<TaskAttempt, ContainerId>(); - private final TezTaskUmbilicalProtocol taskUmbilical; - private final String tokenIdentifier; - private final Token<JobTokenIdentifier> sessionToken; + + protected final String tokenIdentifier; + protected final Token<JobTokenIdentifier> sessionToken; protected InetSocketAddress address; - private Server server; + + protected volatile Server server; public static final class ContainerInfo { @@ -440,7 +442,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { // Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex - private static class TaskAttempt { + protected static class TaskAttempt { // TODO TEZ-2003 Change this to work with VertexName, int id, int version // TODO TEZ-2003 Avoid constructing this unit all over the place private TezTaskAttemptID taskAttemptId;
