TEZ-2670. Remove TaskAttempt holder used within TezTaskCommunicator. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9086d455 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9086d455 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9086d455 Branch: refs/heads/master Commit: 9086d455955b711717a79d59e21119271e491a23 Parents: a689177 Author: Siddharth Seth <[email protected]> Authored: Mon Aug 17 14:45:13 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:15:24 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/app/TezTaskCommunicatorImpl.java | 62 ++++---------------- 2 files changed, 12 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9086d455/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index cd10a03..4ab083c 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -51,5 +51,6 @@ ALL CHANGES: TEZ-2721. rebase 08/14 TEZ-2714. Fix comments from review - part 3. TEZ-2727. Fix findbugs warnings + TEZ-2670. Remove TaskAttempt holder used within TezTaskCommunicator. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/9086d455/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 6c8e1e0..ee1d553 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -74,9 +74,9 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { private final TezTaskUmbilicalProtocol taskUmbilical; protected final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = - new ConcurrentHashMap<ContainerId, ContainerInfo>(); - protected final ConcurrentMap<TaskAttempt, ContainerId> attemptToContainerMap = - new ConcurrentHashMap<TaskAttempt, ContainerId>(); + new ConcurrentHashMap<>(); + protected final ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerMap = + new ConcurrentHashMap<>(); protected final String tokenIdentifier; @@ -208,7 +208,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { if (containerInfo != null) { synchronized(containerInfo) { if (containerInfo.taskSpec != null && containerInfo.taskSpec.getTaskAttemptID() != null) { - attemptToContainerMap.remove(new TaskAttempt(containerInfo.taskSpec.getTaskAttemptID())); + attemptToContainerMap.remove(containerInfo.taskSpec.getTaskAttemptID()); } } } @@ -237,7 +237,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { containerInfo.credentialsChanged = credentialsChanged; containerInfo.taskPulled = false; - ContainerId oldId = attemptToContainerMap.putIfAbsent(new TaskAttempt(taskSpec.getTaskAttemptID()), containerId); + ContainerId oldId = attemptToContainerMap.putIfAbsent(taskSpec.getTaskAttemptID(), containerId); if (oldId != null) { throw new TezUncheckedException( "Attempting to register an already registered taskAttempt with id: " + @@ -250,21 +250,20 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) { - TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID); - ContainerId containerId = attemptToContainerMap.remove(taskAttempt); + ContainerId containerId = attemptToContainerMap.remove(taskAttemptID); if(containerId == null) { - LOG.warn("Unregister task attempt: " + taskAttempt + " from unknown container"); + LOG.warn("Unregister task attempt: " + taskAttemptID + " from unknown container"); return; } ContainerInfo containerInfo = registeredContainers.get(containerId); if (containerInfo == null) { - LOG.warn("Unregister task attempt: " + taskAttempt + + LOG.warn("Unregister task attempt: " + taskAttemptID + " from non-registered container: " + containerId); return; } synchronized (containerInfo) { containerInfo.reset(); - attemptToContainerMap.remove(taskAttempt); + attemptToContainerMap.remove(taskAttemptID); } } @@ -366,7 +365,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { if (taskAttemptID != null) { TaskHeartbeatResponse tResponse; synchronized (containerInfo) { - ContainerId containerIdFromMap = attemptToContainerMap.get(new TaskAttempt(taskAttemptID)); + ContainerId containerIdFromMap = attemptToContainerMap.get(taskAttemptID); if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) { throw new TezException("Attempt " + taskAttemptID + " is not recognized for heartbeat"); @@ -468,50 +467,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { return tlrs; } - - // Holder for Task information, which eventually will likely be VertexImplm taskIndex, attemptIndex - // TODO TEZ-2003. TEZ-2670. Remove this class. - protected static class TaskAttempt { - private TezTaskAttemptID taskAttemptId; - - TaskAttempt(TezTaskAttemptID taskAttemptId) { - this.taskAttemptId = taskAttemptId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof TaskAttempt)) { - return false; - } - - TaskAttempt that = (TaskAttempt) o; - - if (!taskAttemptId.equals(that.taskAttemptId)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - return taskAttemptId.hashCode(); - } - - @Override - public String toString() { - return "TaskAttempt{" + "taskAttemptId=" + taskAttemptId + '}'; - } - } - protected ContainerInfo getContainerInfo(ContainerId containerId) { return registeredContainers.get(containerId); } protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) { - return attemptToContainerMap.get(new TaskAttempt(taskAttemptId)); + return attemptToContainerMap.get(taskAttemptId); } } \ No newline at end of file
