This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b97acb53 TEZ-1037: Replace multiple members in TaskAttemptImpl for 
container related stuff with a single reference to the container. (#305) (Ayush 
Saxena reviewed by Laszlo Bodor)
4b97acb53 is described below

commit 4b97acb530be9de430601fd9702bf548a401ad09
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Thu Aug 24 12:30:59 2023 +0530

    TEZ-1037: Replace multiple members in TaskAttemptImpl for container related 
stuff with a single reference to the container. (#305) (Ayush Saxena reviewed 
by Laszlo Bodor)
---
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      | 105 ++++++-------
 .../apache/tez/dag/app/dag/impl/TezContainer.java  | 164 +++++++++++++++++++++
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      |   6 +-
 3 files changed, 209 insertions(+), 66 deletions(-)

diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 289f1a188..fb8aed267 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -120,6 +120,8 @@ import org.apache.tez.common.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import static org.apache.tez.dag.app.dag.impl.TezContainer.NULL_TEZ_CONTAINER;
+
 public class TaskAttemptImpl implements TaskAttempt,
     EventHandler<TaskAttemptEvent> {
 
@@ -187,14 +189,8 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String trackerName;
   private int httpPort;
 
-  // TODO Can these be replaced by the container object TEZ-1037
-  private Container container;
+  TezContainer container = NULL_TEZ_CONTAINER;
   private long allocationTime;
-  private ContainerId containerId;
-  protected NodeId containerNodeId;
-  private String nodeHttpAddress;
-  private String nodeRackName;
-  
   private final Vertex vertex;
   private final Task task;
   private final TaskLocationHint locationHint;
@@ -614,8 +610,8 @@ public class TaskAttemptImpl implements TaskAttempt,
       result.setContainerId(this.getAssignedContainerID());
       result.setNodeManagerHost(trackerName);
       result.setNodeManagerHttpPort(httpPort);
-      if (this.containerNodeId != null) {
-        result.setNodeManagerPort(this.containerNodeId.getPort());
+      if (this.container.getNodeId() != null) {
+        result.setNodeManagerPort(this.container.getNodeId().getPort());
       }
       return result;
     } finally {
@@ -625,11 +621,9 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   @Override
   public List<String> getDiagnostics() {
-    List<String> result = new ArrayList<String>();
     readLock.lock();
     try {
-      result.addAll(diagnostics);
-      return result;
+      return new ArrayList<String>(diagnostics);
     } finally {
       readLock.unlock();
     }
@@ -714,7 +708,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public ContainerId getAssignedContainerID() {
     readLock.lock();
     try {
-      return containerId;
+      return container.getId();
     } finally {
       readLock.unlock();
     }
@@ -724,7 +718,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public Container getAssignedContainer() {
     readLock.lock();
     try {
-      return container;
+      return container == NULL_TEZ_CONTAINER ? null : container;
     } finally {
       readLock.unlock();
     }
@@ -734,7 +728,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String getAssignedContainerMgrAddress() {
     readLock.lock();
     try {
-      return containerNodeId.toString();
+      return container.getNodeId().toString();
     } finally {
       readLock.unlock();
     }
@@ -744,7 +738,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public NodeId getNodeId() {
     readLock.lock();
     try {
-      return containerNodeId;
+      return container.getNodeId();
     } finally {
       readLock.unlock();
     }
@@ -756,7 +750,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String getNodeHttpAddress() {
     readLock.lock();
     try {
-      return nodeHttpAddress;
+      return container.getNodeHttpAddress();
     } finally {
       readLock.unlock();
     }
@@ -769,7 +763,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   public String getNodeRackName() {
     this.readLock.lock();
     try {
-      return this.nodeRackName;
+      return container.getRackName();
     } finally {
       this.readLock.unlock();
     }
@@ -1136,8 +1130,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     String completedLogsUrl = getCompletedLogsUrl();
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getVertex().getName(),
-        launchTime, containerId, containerNodeId,
-        inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
+        launchTime, container.getId(), container.getNodeId(),
+        inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress());
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), startEvt));
   }
@@ -1145,7 +1139,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal 
state) {
     Preconditions.checkArgument(recoveryData == null
         || recoveryData.getTaskAttemptFinishedEvent() == null,
-        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAtttemptFinishedEvent");
+        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAttemptFinishedEvent");
     if (getLaunchTime() == 0) return;
 
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
@@ -1163,7 +1157,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptState state, TaskFailureType taskFailureType) {
     Preconditions.checkArgument(recoveryData == null
         || recoveryData.getTaskAttemptFinishedEvent() == null,
-        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAtttemptFinishedEvent");
+        "log TaskAttemptFinishedEvent again in recovery when there's already 
another TaskAttemptFinishedEvent");
     if (state == TaskAttemptState.FAILED && taskFailureType == null) {
       throw new IllegalStateException("FAILED state must be accompanied by a 
FailureType");
     }
@@ -1174,8 +1168,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     String completedLogsUrl = null;
     if (finishTime <= 0) {
       finishTime = clock.getTime(); // comes here in case it was terminated 
before launch
-      unsuccessfulContainerId = containerId;
-      unsuccessfulContainerNodeId = containerNodeId;
+      unsuccessfulContainerId = container.getId();
+      unsuccessfulContainerNodeId = container.getNodeId();
       inProgressLogsUrl = getInProgressLogsUrl();
       completedLogsUrl = getCompletedLogsUrl();
     }
@@ -1186,8 +1180,8 @@ public class TaskAttemptImpl implements TaskAttempt,
         terminationCause,
         StringUtils.join(
             getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
-        taGeneratedEvents, creationTime, creationCausalTA, allocationTime,
-        unsuccessfulContainerId, unsuccessfulContainerNodeId, 
inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
+        taGeneratedEvents, creationTime, creationCausalTA, allocationTime, 
unsuccessfulContainerId,
+        unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, 
container.getNodeHttpAddress());
     // FIXME how do we store information regd completion events
     this.appContext.getHistoryHandler().handle(
         new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1199,9 +1193,9 @@ public class TaskAttemptImpl implements TaskAttempt,
           TezConstants.getTezYarnServicePluginName())
         || 
getVertex().getServicePluginInfo().getContainerLauncherName().equals(
           TezConstants.getTezUberServicePluginName())) {
-      if (containerId != null && nodeHttpAddress != null) {
-        final String containerIdStr = containerId.toString();
-        inProgressLogsUrl = nodeHttpAddress
+      if (container.getId() != null && container.getNodeHttpAddress() != null) 
{
+        final String containerIdStr = container.getId().toString();
+        inProgressLogsUrl = container.getNodeHttpAddress()
             + "/" + "node/containerlogs"
             + "/" + containerIdStr
             + "/" + this.appContext.getUser();
@@ -1209,7 +1203,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     } else {
       inProgressLogsUrl = 
appContext.getTaskCommunicatorManager().getInProgressLogsUrl(
           getVertex().getTaskCommunicatorIdentifier(),
-          attemptId, containerNodeId);
+          attemptId, container.getNodeId());
     }
     return inProgressLogsUrl;
   }
@@ -1220,15 +1214,16 @@ public class TaskAttemptImpl implements TaskAttempt,
           TezConstants.getTezYarnServicePluginName())
         || 
getVertex().getServicePluginInfo().getContainerLauncherName().equals(
           TezConstants.getTezUberServicePluginName())) {
-      if (containerId != null && containerNodeId != null && nodeHttpAddress != 
null) {
-        final String containerIdStr = containerId.toString();
+      if (container.getId() != null && container.getNodeId() != null &&
+          container.getNodeHttpAddress() != null) {
+        final String containerIdStr = container.getId().toString();
         if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
             YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
             && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
           String contextStr = "v_" + getVertex().getName()
               + "_" + this.attemptId.toString();
           completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
-              + "/" + containerNodeId.toString()
+              + "/" + container.getNodeId().toString()
               + "/" + containerIdStr
               + "/" + contextStr
               + "/" + this.appContext.getUser();
@@ -1237,7 +1232,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     } else {
       completedLogsUrl = 
appContext.getTaskCommunicatorManager().getCompletedLogsUrl(
           getVertex().getTaskCommunicatorIdentifier(),
-          attemptId, containerNodeId);
+          attemptId, container.getNodeId());
     }
     return completedLogsUrl;
   }
@@ -1386,13 +1381,10 @@ public class TaskAttemptImpl implements TaskAttempt,
       if (event instanceof TaskAttemptEventContainerTerminated) {
         TaskAttemptEventContainerTerminated tEvent = 
(TaskAttemptEventContainerTerminated) event;
         AMContainer amContainer = 
ta.appContext.getAllContainers().get(tEvent.getContainerId());
-        Container container = amContainer.getContainer();
+        TezContainer container = new TezContainer(amContainer.getContainer());
 
         ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
         ta.container = container;
-        ta.containerId = tEvent.getContainerId();
-        ta.containerNodeId = container.getNodeId();
-        ta.nodeHttpAddress = 
StringInterner.intern(container.getNodeHttpAddress());
       }
 
       if (event instanceof TaskAttemptEventContainerTerminatedBySystem) {
@@ -1401,10 +1393,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         Container container = amContainer.getContainer();
 
         ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
-        ta.container = container;
-        ta.containerId = tEvent.getContainerId();
-        ta.containerNodeId = container.getNodeId();
-        ta.nodeHttpAddress = 
StringInterner.intern(container.getNodeHttpAddress());
+        ta.container = new TezContainer(container);
       }
 
       if (ta.recoveryData == null ||
@@ -1440,29 +1429,23 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent;
 
       AMContainer amContainer = 
ta.appContext.getAllContainers().get(event.getContainerId());
-      Container container = amContainer.getContainer();
+      TezContainer container = new TezContainer(amContainer.getContainer());
 
       ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
-      ta.container = container;
-      ta.containerId = event.getContainerId();
-      ta.containerNodeId = container.getNodeId();
-      ta.nodeHttpAddress = 
StringInterner.intern(container.getNodeHttpAddress());
-      ta.nodeRackName = 
StringInterner.intern(RackResolver.resolve(ta.containerNodeId.getHost())
-          .getNetworkLocation());
+      ta.container = new TezContainer(container);
       ta.lastNotifyProgressTimestamp = ta.clock.getTime();
 
       ta.setLaunchTime();
 
       // TODO Resolve to host / IP in case of a local address.
-      InetSocketAddress nodeHttpInetAddr = NetUtils
-          .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
+      InetSocketAddress nodeHttpInetAddr = 
NetUtils.createSocketAddr(container.getNodeHttpAddress()); // TODO: Costly?
       ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName());
       ta.httpPort = nodeHttpInetAddr.getPort();
       ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));
 
       LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted."
-          + " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
-          + ta.containerNodeId + "]");
+          + " Is using containerId: [" + ta.container.getId() + "]" + " on NM: 
["
+          + ta.container.getNodeId() + "]");
 
       // JobHistoryEvent.
       // The started event represents when the attempt was submitted to the 
executor.
@@ -1470,9 +1453,9 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // TODO Remove after HDFS-5098
       // Compute LOCALITY counter for this task.
-      if (ta.taskHosts.contains(ta.containerNodeId.getHost())) {
+      if (ta.taskHosts.contains(ta.container.getNodeId().getHost())) {
         ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
-      } else if (ta.taskRacks.contains(ta.nodeRackName)) {
+      } else if (ta.taskRacks.contains(container.getRackName())) {
         ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
       } else {
         // Not computing this if the task does not have locality information.
@@ -1531,9 +1514,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       super.transition(ta, event);
       // Inform the scheduler
       if (sendSchedulerEvent()) {
-        ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
-            .getTaskAttemptState(), 
TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
-            ta instanceof DiagnosableEvent ? 
((DiagnosableEvent)ta).getDiagnosticInfo() : null,
+        ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), 
helper.getTaskAttemptState(),
+            TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
+            ta instanceof DiagnosableEvent ? ((DiagnosableEvent) 
ta).getDiagnosticInfo() : null,
             ta.getVertex().getTaskSchedulerIdentifier()));
       }
     }
@@ -1648,7 +1631,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
 
       // If TaskAttempt is recovered to SUCCEEDED, send events generated by 
this TaskAttempt to vertex
-      // for its downstream consumers. For normal dag execution, the events 
are sent by TaskAttmeptListener
+      // for its downstream consumers. For normal dag execution, the events 
are sent by TaskAttemptListener
       // for performance consideration.
       if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) 
{
         TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData
@@ -1671,8 +1654,8 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptState.SUCCEEDED));
 
       // Inform the Scheduler.
-      ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
-          TaskAttemptState.SUCCEEDED, null, null, 
ta.getVertex().getTaskSchedulerIdentifier()));
+      ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), 
TaskAttemptState.SUCCEEDED, null, null,
+          ta.getVertex().getTaskSchedulerIdentifier()));
 
       // Inform the task.
       ta.sendEvent(new TaskEventTASucceeded(ta.attemptId));
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java
new file mode 100644
index 000000000..ae58f80b7
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.util.StringInterner;
+
+/**
+ * Convenience wrapper around  {@link 
org.apache.hadoop.yarn.api.records.Container}
+ */
+public class TezContainer extends Container {
+
+  public final static TezContainer NULL_TEZ_CONTAINER = new TezContainer(null);
+  private final Container container;
+
+  public TezContainer(Container container) {
+    this.container = container;
+  }
+
+  @Override
+  public ContainerId getId() {
+    return container != null ? container.getId() : null;
+  }
+
+  @Override
+  public void setId(ContainerId id) {
+    container.setId(id);
+  }
+
+  @Override
+  public NodeId getNodeId() {
+    return container != null ? container.getNodeId() : null;
+  }
+
+  @Override
+  public void setNodeId(NodeId nodeId) {
+    container.setNodeId(nodeId);
+  }
+
+  @Override
+  public String getNodeHttpAddress() {
+    return container != null ? 
StringInterner.intern(container.getNodeHttpAddress()) : null;
+  }
+
+  @Override
+  public void setNodeHttpAddress(String nodeHttpAddress) {
+    container.setNodeHttpAddress(nodeHttpAddress);
+  }
+
+  @Override
+  public Map<String, List<Map<String, String>>> getExposedPorts() {
+    return container.getExposedPorts();
+  }
+
+  @Override
+  public void setExposedPorts(Map<String, List<Map<String, String>>> ports) {
+    container.setExposedPorts(ports);
+  }
+
+  @Override
+  public Resource getResource() {
+    return container.getResource();
+  }
+
+  @Override
+  public void setResource(Resource resource) {
+    container.setResource(resource);
+  }
+
+  @Override
+  public Priority getPriority() {
+    return container.getPriority();
+  }
+
+  @Override
+  public void setPriority(Priority priority) {
+    container.setPriority(priority);
+  }
+
+  @Override
+  public Token getContainerToken() {
+    return container.getContainerToken();
+  }
+
+  @Override
+  public void setContainerToken(Token containerToken) {
+    container.setContainerToken(containerToken);
+  }
+
+  @Override
+  public ExecutionType getExecutionType() {
+    return container.getExecutionType();
+  }
+
+  @Override
+  public void setExecutionType(ExecutionType executionType) {
+    container.setExecutionType(executionType);
+  }
+
+  @Override
+  public int compareTo(Container other) {
+    if (this.getId().compareTo(other.getId()) == 0) {
+      if (this.getNodeId().compareTo(other.getNodeId()) == 0) {
+        return this.getResource().compareTo(other.getResource());
+      } else {
+        return this.getNodeId().compareTo(other.getNodeId());
+      }
+    } else {
+      return this.getId().compareTo(other.getId());
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      Container otherContainer = ((TezContainer) other).container;
+      if (this.container == null && otherContainer == null) {
+        return true;
+      } else if (this.container == null) {
+        return false;
+      }
+      return this.container.equals((otherContainer));
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return container.hashCode();
+  }
+
+  public String getRackName() {
+    return 
StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation());
+  }
+}
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index d0088bfc5..ee8ec67cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -130,16 +130,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestTaskAttempt.class);
-
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
     public FileStatus getFileStatus(Path f) throws IOException {
@@ -2315,7 +2311,7 @@ public class TestTaskAttempt {
     boolean inputFailedReported = false;
 
     public MockTaskAttemptImpl setNodeId(NodeId nodeId) {
-      this.containerNodeId = nodeId;
+      this.container = new TezContainer(Container.newInstance(null, nodeId, 
null, null, null, null));
       return this;
     }
 

Reply via email to