This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new 4b97acb53 TEZ-1037: Replace multiple members in TaskAttemptImpl for container related stuff with a single reference to the container. (#305) (Ayush Saxena reviewed by Laszlo Bodor) 4b97acb53 is described below commit 4b97acb530be9de430601fd9702bf548a401ad09 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Thu Aug 24 12:30:59 2023 +0530 TEZ-1037: Replace multiple members in TaskAttemptImpl for container related stuff with a single reference to the container. (#305) (Ayush Saxena reviewed by Laszlo Bodor) --- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 105 ++++++------- .../apache/tez/dag/app/dag/impl/TezContainer.java | 164 +++++++++++++++++++++ .../tez/dag/app/dag/impl/TestTaskAttempt.java | 6 +- 3 files changed, 209 insertions(+), 66 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 289f1a188..fb8aed267 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -120,6 +120,8 @@ import org.apache.tez.common.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import static org.apache.tez.dag.app.dag.impl.TezContainer.NULL_TEZ_CONTAINER; + public class TaskAttemptImpl implements TaskAttempt, EventHandler<TaskAttemptEvent> { @@ -187,14 +189,8 @@ public class TaskAttemptImpl implements TaskAttempt, private String trackerName; private int httpPort; - // TODO Can these be replaced by the container object TEZ-1037 - private Container container; + TezContainer container = NULL_TEZ_CONTAINER; private long allocationTime; - private ContainerId containerId; - protected NodeId containerNodeId; - private String nodeHttpAddress; - private String nodeRackName; - private final Vertex vertex; private final Task task; private final TaskLocationHint locationHint; @@ -614,8 +610,8 @@ public class TaskAttemptImpl implements TaskAttempt, result.setContainerId(this.getAssignedContainerID()); result.setNodeManagerHost(trackerName); result.setNodeManagerHttpPort(httpPort); - if (this.containerNodeId != null) { - result.setNodeManagerPort(this.containerNodeId.getPort()); + if (this.container.getNodeId() != null) { + result.setNodeManagerPort(this.container.getNodeId().getPort()); } return result; } finally { @@ -625,11 +621,9 @@ public class TaskAttemptImpl implements TaskAttempt, @Override public List<String> getDiagnostics() { - List<String> result = new ArrayList<String>(); readLock.lock(); try { - result.addAll(diagnostics); - return result; + return new ArrayList<String>(diagnostics); } finally { readLock.unlock(); } @@ -714,7 +708,7 @@ public class TaskAttemptImpl implements TaskAttempt, public ContainerId getAssignedContainerID() { readLock.lock(); try { - return containerId; + return container.getId(); } finally { readLock.unlock(); } @@ -724,7 +718,7 @@ public class TaskAttemptImpl implements TaskAttempt, public Container getAssignedContainer() { readLock.lock(); try { - return container; + return container == NULL_TEZ_CONTAINER ? null : container; } finally { readLock.unlock(); } @@ -734,7 +728,7 @@ public class TaskAttemptImpl implements TaskAttempt, public String getAssignedContainerMgrAddress() { readLock.lock(); try { - return containerNodeId.toString(); + return container.getNodeId().toString(); } finally { readLock.unlock(); } @@ -744,7 +738,7 @@ public class TaskAttemptImpl implements TaskAttempt, public NodeId getNodeId() { readLock.lock(); try { - return containerNodeId; + return container.getNodeId(); } finally { readLock.unlock(); } @@ -756,7 +750,7 @@ public class TaskAttemptImpl implements TaskAttempt, public String getNodeHttpAddress() { readLock.lock(); try { - return nodeHttpAddress; + return container.getNodeHttpAddress(); } finally { readLock.unlock(); } @@ -769,7 +763,7 @@ public class TaskAttemptImpl implements TaskAttempt, public String getNodeRackName() { this.readLock.lock(); try { - return this.nodeRackName; + return container.getRackName(); } finally { this.readLock.unlock(); } @@ -1136,8 +1130,8 @@ public class TaskAttemptImpl implements TaskAttempt, String completedLogsUrl = getCompletedLogsUrl(); TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent( attemptId, getVertex().getName(), - launchTime, containerId, containerNodeId, - inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + launchTime, container.getId(), container.getNodeId(), + inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress()); this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), startEvt)); } @@ -1145,7 +1139,7 @@ public class TaskAttemptImpl implements TaskAttempt, protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) { Preconditions.checkArgument(recoveryData == null || recoveryData.getTaskAttemptFinishedEvent() == null, - "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent"); if (getLaunchTime() == 0) return; TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( @@ -1163,7 +1157,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptState state, TaskFailureType taskFailureType) { Preconditions.checkArgument(recoveryData == null || recoveryData.getTaskAttemptFinishedEvent() == null, - "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent"); + "log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent"); if (state == TaskAttemptState.FAILED && taskFailureType == null) { throw new IllegalStateException("FAILED state must be accompanied by a FailureType"); } @@ -1174,8 +1168,8 @@ public class TaskAttemptImpl implements TaskAttempt, String completedLogsUrl = null; if (finishTime <= 0) { finishTime = clock.getTime(); // comes here in case it was terminated before launch - unsuccessfulContainerId = containerId; - unsuccessfulContainerNodeId = containerNodeId; + unsuccessfulContainerId = container.getId(); + unsuccessfulContainerNodeId = container.getNodeId(); inProgressLogsUrl = getInProgressLogsUrl(); completedLogsUrl = getCompletedLogsUrl(); } @@ -1186,8 +1180,8 @@ public class TaskAttemptImpl implements TaskAttempt, terminationCause, StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents, - taGeneratedEvents, creationTime, creationCausalTA, allocationTime, - unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress); + taGeneratedEvents, creationTime, creationCausalTA, allocationTime, unsuccessfulContainerId, + unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress()); // FIXME how do we store information regd completion events this.appContext.getHistoryHandler().handle( new DAGHistoryEvent(getDAGID(), finishEvt)); @@ -1199,9 +1193,9 @@ public class TaskAttemptImpl implements TaskAttempt, TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (containerId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); - inProgressLogsUrl = nodeHttpAddress + if (container.getId() != null && container.getNodeHttpAddress() != null) { + final String containerIdStr = container.getId().toString(); + inProgressLogsUrl = container.getNodeHttpAddress() + "/" + "node/containerlogs" + "/" + containerIdStr + "/" + this.appContext.getUser(); @@ -1209,7 +1203,7 @@ public class TaskAttemptImpl implements TaskAttempt, } else { inProgressLogsUrl = appContext.getTaskCommunicatorManager().getInProgressLogsUrl( getVertex().getTaskCommunicatorIdentifier(), - attemptId, containerNodeId); + attemptId, container.getNodeId()); } return inProgressLogsUrl; } @@ -1220,15 +1214,16 @@ public class TaskAttemptImpl implements TaskAttempt, TezConstants.getTezYarnServicePluginName()) || getVertex().getServicePluginInfo().getContainerLauncherName().equals( TezConstants.getTezUberServicePluginName())) { - if (containerId != null && containerNodeId != null && nodeHttpAddress != null) { - final String containerIdStr = containerId.toString(); + if (container.getId() != null && container.getNodeId() != null && + container.getNodeHttpAddress() != null) { + final String containerIdStr = container.getId().toString(); if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED) && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) { String contextStr = "v_" + getVertex().getName() + "_" + this.attemptId.toString(); completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) - + "/" + containerNodeId.toString() + + "/" + container.getNodeId().toString() + "/" + containerIdStr + "/" + contextStr + "/" + this.appContext.getUser(); @@ -1237,7 +1232,7 @@ public class TaskAttemptImpl implements TaskAttempt, } else { completedLogsUrl = appContext.getTaskCommunicatorManager().getCompletedLogsUrl( getVertex().getTaskCommunicatorIdentifier(), - attemptId, containerNodeId); + attemptId, container.getNodeId()); } return completedLogsUrl; } @@ -1386,13 +1381,10 @@ public class TaskAttemptImpl implements TaskAttempt, if (event instanceof TaskAttemptEventContainerTerminated) { TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event; AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId()); - Container container = amContainer.getContainer(); + TezContainer container = new TezContainer(amContainer.getContainer()); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); ta.container = container; - ta.containerId = tEvent.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); } if (event instanceof TaskAttemptEventContainerTerminatedBySystem) { @@ -1401,10 +1393,7 @@ public class TaskAttemptImpl implements TaskAttempt, Container container = amContainer.getContainer(); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); - ta.container = container; - ta.containerId = tEvent.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); + ta.container = new TezContainer(container); } if (ta.recoveryData == null || @@ -1440,29 +1429,23 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent; AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); - Container container = amContainer.getContainer(); + TezContainer container = new TezContainer(amContainer.getContainer()); ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime(); - ta.container = container; - ta.containerId = event.getContainerId(); - ta.containerNodeId = container.getNodeId(); - ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress()); - ta.nodeRackName = StringInterner.intern(RackResolver.resolve(ta.containerNodeId.getHost()) - .getNetworkLocation()); + ta.container = new TezContainer(container); ta.lastNotifyProgressTimestamp = ta.clock.getTime(); ta.setLaunchTime(); // TODO Resolve to host / IP in case of a local address. - InetSocketAddress nodeHttpInetAddr = NetUtils - .createSocketAddr(ta.nodeHttpAddress); // TODO: Costly? + InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(container.getNodeHttpAddress()); // TODO: Costly? ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName()); ta.httpPort = nodeHttpInetAddr.getPort(); ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta)); LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted." - + " Is using containerId: [" + ta.containerId + "]" + " on NM: [" - + ta.containerNodeId + "]"); + + " Is using containerId: [" + ta.container.getId() + "]" + " on NM: [" + + ta.container.getNodeId() + "]"); // JobHistoryEvent. // The started event represents when the attempt was submitted to the executor. @@ -1470,9 +1453,9 @@ public class TaskAttemptImpl implements TaskAttempt, // TODO Remove after HDFS-5098 // Compute LOCALITY counter for this task. - if (ta.taskHosts.contains(ta.containerNodeId.getHost())) { + if (ta.taskHosts.contains(ta.container.getNodeId().getHost())) { ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS; - } else if (ta.taskRacks.contains(ta.nodeRackName)) { + } else if (ta.taskRacks.contains(container.getRackName())) { ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS; } else { // Not computing this if the task does not have locality information. @@ -1531,9 +1514,9 @@ public class TaskAttemptImpl implements TaskAttempt, super.transition(ta, event); // Inform the scheduler if (sendSchedulerEvent()) { - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper - .getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), - ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null, + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), helper.getTaskAttemptState(), + TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause), + ta instanceof DiagnosableEvent ? ((DiagnosableEvent) ta).getDiagnosticInfo() : null, ta.getVertex().getTaskSchedulerIdentifier())); } } @@ -1648,7 +1631,7 @@ public class TaskAttemptImpl implements TaskAttempt, public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) { // If TaskAttempt is recovered to SUCCEEDED, send events generated by this TaskAttempt to vertex - // for its downstream consumers. For normal dag execution, the events are sent by TaskAttmeptListener + // for its downstream consumers. For normal dag execution, the events are sent by TaskAttemptListener // for performance consideration. if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) { TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData @@ -1671,8 +1654,8 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptState.SUCCEEDED)); // Inform the Scheduler. - ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, - TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier())); + ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), TaskAttemptState.SUCCEEDED, null, null, + ta.getVertex().getTaskSchedulerIdentifier())); // Inform the task. ta.sendEvent(new TaskEventTASucceeded(ta.attemptId)); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java new file mode 100644 index 000000000..ae58f80b7 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezContainer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tez.util.StringInterner; + +/** + * Convenience wrapper around {@link org.apache.hadoop.yarn.api.records.Container} + */ +public class TezContainer extends Container { + + public final static TezContainer NULL_TEZ_CONTAINER = new TezContainer(null); + private final Container container; + + public TezContainer(Container container) { + this.container = container; + } + + @Override + public ContainerId getId() { + return container != null ? container.getId() : null; + } + + @Override + public void setId(ContainerId id) { + container.setId(id); + } + + @Override + public NodeId getNodeId() { + return container != null ? container.getNodeId() : null; + } + + @Override + public void setNodeId(NodeId nodeId) { + container.setNodeId(nodeId); + } + + @Override + public String getNodeHttpAddress() { + return container != null ? StringInterner.intern(container.getNodeHttpAddress()) : null; + } + + @Override + public void setNodeHttpAddress(String nodeHttpAddress) { + container.setNodeHttpAddress(nodeHttpAddress); + } + + @Override + public Map<String, List<Map<String, String>>> getExposedPorts() { + return container.getExposedPorts(); + } + + @Override + public void setExposedPorts(Map<String, List<Map<String, String>>> ports) { + container.setExposedPorts(ports); + } + + @Override + public Resource getResource() { + return container.getResource(); + } + + @Override + public void setResource(Resource resource) { + container.setResource(resource); + } + + @Override + public Priority getPriority() { + return container.getPriority(); + } + + @Override + public void setPriority(Priority priority) { + container.setPriority(priority); + } + + @Override + public Token getContainerToken() { + return container.getContainerToken(); + } + + @Override + public void setContainerToken(Token containerToken) { + container.setContainerToken(containerToken); + } + + @Override + public ExecutionType getExecutionType() { + return container.getExecutionType(); + } + + @Override + public void setExecutionType(ExecutionType executionType) { + container.setExecutionType(executionType); + } + + @Override + public int compareTo(Container other) { + if (this.getId().compareTo(other.getId()) == 0) { + if (this.getNodeId().compareTo(other.getNodeId()) == 0) { + return this.getResource().compareTo(other.getResource()); + } else { + return this.getNodeId().compareTo(other.getNodeId()); + } + } else { + return this.getId().compareTo(other.getId()); + } + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + Container otherContainer = ((TezContainer) other).container; + if (this.container == null && otherContainer == null) { + return true; + } else if (this.container == null) { + return false; + } + return this.container.equals((otherContainer)); + } + return false; + } + + @Override + public int hashCode() { + return container.hashCode(); + } + + public String getRackName() { + return StringInterner.intern(RackResolver.resolve(container.getNodeId().getHost()).getNetworkLocation()); + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index d0088bfc5..ee8ec67cf 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -130,16 +130,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestTaskAttempt { - private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class); - static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -2315,7 +2311,7 @@ public class TestTaskAttempt { boolean inputFailedReported = false; public MockTaskAttemptImpl setNodeId(NodeId nodeId) { - this.containerNodeId = nodeId; + this.container = new TezContainer(Container.newInstance(null, nodeId, null, null, null, null)); return this; }