This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 19b2351a9 TEZ-4554: Counter for used nodes within a DAG (#362) (Laszlo
19b2351a9 is described below
commit 19b2351a909589500397cc69e12d22da1f74949a
Author: Bodor Laszlo <[email protected]>
AuthorDate: Tue Jun 25 07:51:05 2024 +0200
TEZ-4554: Counter for used nodes within a DAG (#362) (Laszlo
---
.../org/apache/tez/common/counters/DAGCounter.java | 15 ++++-
.../main/java/org/apache/tez/dag/app/dag/DAG.java | 4 +-
.../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 14 ++--
.../tez/dag/app/rm/TaskSchedulerManager.java | 44 ++++++++-----
.../apache/tez/dag/app/dag/impl/TestDAGImpl.java | 75 +++++++++++++++++++---
.../tez/dag/app/rm/TestTaskSchedulerManager.java | 2 +-
6 files changed, 121 insertions(+), 33 deletions(-)
diff --git
a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 149bacdf5..23c197843 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -87,5 +87,18 @@ public enum DAGCounter {
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
- TOTAL_CONTAINER_REUSE_COUNT
+ TOTAL_CONTAINER_REUSE_COUNT,
+
+ /*
+ * Number of nodes to which task attempts were assigned in this DAG.
+ * Nodes are distinguished by the Yarn NodeId.getHost().
+ */
+ NODE_USED_COUNT,
+
+ /*
+ * Total number of nodes visible to the task scheduler (regardless of
+ * task assignments). This is typically exposed by a resource manager
+ * client.
+ */
+ NODE_TOTAL_COUNT
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index ff5afb409..c828d81b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
@@ -106,7 +106,7 @@ public interface DAG extends DagInfo {
void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
- void addUsedContainer(ContainerId containerId);
+ void addUsedContainer(Container container);
/**
* Called by the DAGAppMaster when the DAG is started normally or in the
event of recovery.
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index b00cea8b2..307580741 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -250,6 +251,8 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
private final MemoryMXBean memoryMXBean =
ManagementFactory.getMemoryMXBean();
private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
+ @VisibleForTesting
+ final Set<String> nodesUsedByCurrentDAG = new HashSet<>();
protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
@@ -2563,7 +2566,7 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
@Override
public void onFinish() {
stopVertexServices();
- handleUsedContainersOnDagFinish();
+ updateCounters();
}
private void startVertexServices() {
@@ -2579,11 +2582,14 @@ public class DAGImpl implements
org.apache.tez.dag.app.dag.DAG,
}
@Override
- public void addUsedContainer(ContainerId containerId) {
- containersUsedByCurrentDAG.add(containerId);
+ public void addUsedContainer(Container container) {
+ containersUsedByCurrentDAG.add(container.getId());
+ nodesUsedByCurrentDAG.add(container.getNodeId().getHost());
}
- private void handleUsedContainersOnDagFinish() {
+ private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED,
containersUsedByCurrentDAG.size());
+ setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
+ setDagCounter(DAGCounter.NODE_TOTAL_COUNT,
appContext.getTaskScheduler().getNumClusterNodes(true));
}
}
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 0bf62afbc..e311c23e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -224,9 +224,24 @@ public class TaskSchedulerManager extends AbstractService
implements
}
public int getNumClusterNodes() {
+ return getNumClusterNodes(false);
+ }
+
+ public int getNumClusterNodes(boolean tryUpdate){
+ if (cachedNodeCount == -1 && tryUpdate){
+ cachedNodeCount = countAllNodes();
+ }
return cachedNodeCount;
}
-
+
+ private int countAllNodes() {
+ try {
+ return taskSchedulers[0].getClusterNodeCount();
+ } catch (Exception e) {
+ return handleTaskSchedulerExceptionWhileGettingNodeCount(e);
+ }
+ }
+
public Resource getAvailableResources(int schedulerId) {
try {
return taskSchedulers[schedulerId].getAvailableResources();
@@ -746,7 +761,7 @@ public class TaskSchedulerManager extends AbstractService
implements
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), schedulerId, container.getId()));
}
- appContext.getCurrentDAG().addUsedContainer(containerId);
+ appContext.getCurrentDAG().addUsedContainer(container);
TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
@@ -883,19 +898,7 @@ public class TaskSchedulerManager extends AbstractService
implements
// Doubles as a mechanism to update node counts periodically. Hence
schedulerId required.
// TODO Handle this in TEZ-2124. Need a way to know which scheduler is
calling in.
- int nodeCount = 0;
- try {
- nodeCount = taskSchedulers[0].getClusterNodeCount();
- } catch (Exception e) {
- String msg = "Error in TaskScheduler while getting node count"
- + ", scheduler=" +
Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
- LOG.error(msg, e);
- sendEvent(
- new DAGAppMasterEventUserServiceFatalError(
- DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
- msg, e));
- throw new RuntimeException(e);
- }
+ int nodeCount = countAllNodes();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
@@ -903,6 +906,17 @@ public class TaskSchedulerManager extends AbstractService
implements
return dagAppMaster.getProgress();
}
+ private int handleTaskSchedulerExceptionWhileGettingNodeCount(Exception e) {
+ String msg = "Error in TaskScheduler while getting node count"
+ + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(0,
appContext);
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+ msg, e));
+ throw new RuntimeException(e);
+ }
+
public void reportError(int taskSchedulerIndex, ServicePluginError
servicePluginError,
String diagnostics,
DagInfo dagInfo) {
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1e003accc..46c4fe1cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -49,6 +50,7 @@ import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.junit.Rule;
@@ -60,6 +62,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
@@ -181,6 +185,7 @@ public class TestDAGImpl {
private ACLManager aclManager;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
+ private TaskSchedulerManager taskSchedulerManager;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
@@ -861,11 +866,12 @@ public class TestDAGImpl {
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
+ taskSchedulerManager = mock(TaskSchedulerManager.class);
execService = mock(ListeningExecutorService.class);
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(defaultShim);
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
-
+
doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
@@ -2358,22 +2364,71 @@ public class TestDAGImpl {
}
- @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testTotalContainersUsedCounter() {
+ DAGImpl spy = getDagSpy();
+
+
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
+ mock(NodeId.class), null, null, null, null));
+
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
+ mock(NodeId.class), null, null, null, null));
+
+ spy.onFinish();
+ // 2 calls to addUsedContainer
+ verify(spy, times(2)).addUsedContainer(any(Container.class));
+ // 2 containers were used
+ Assert.assertEquals(2,
+
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
+ .getValue());
+ }
+
+ @Test(timeout = 5000)
+ public void testNodesUsedCounter() {
+ DAGImpl spy = getDagSpy();
+
+ Container containerOnHost = mock(Container.class);
+
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
+ Container containerOnSameHost = mock(Container.class);
+
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
+ Container containerOnDifferentHost = mock(Container.class);
+
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
+ Container containerOnSameHostWithDifferentPort = mock(Container.class);
+
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));
+
+ spy.addUsedContainer(containerOnHost);
+ spy.addUsedContainer(containerOnSameHost);
+ spy.addUsedContainer(containerOnDifferentHost);
+ spy.addUsedContainer(containerOnSameHostWithDifferentPort);
+
+ when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10);
+
+ spy.onFinish();
+ // 4 calls to addUsedContainer
+ verify(spy, times(4)).addUsedContainer(any(Container.class));
+
+ // 2 distinct node hosts were seen: localhost, otherhost
+ Assert.assertEquals(2,
+
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
+ .getValue());
+
+ Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost"));
+ Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost"));
+
+ Assert.assertEquals(10,
+ spy.getAllCounters().getGroup(DAGCounter.class.getName())
+ .findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
+ .getValue());
+ }
+
+ private DAGImpl getDagSpy() {
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();
- DAGImpl spy = spy(mrrDag);
- spy.addUsedContainer(mock(ContainerId.class));
- spy.addUsedContainer(mock(ContainerId.class));
+ // needed when onFinish() method is called on a DAGImpl
+ when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);
- spy.onFinish();
- // 2 calls to addUsedContainer, obviously, we did it here
- verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
- // 1 call to setDagCounter, which happened at dag.onFinish
- verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
+ return spy(mrrDag);
}
}
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index e3dd1ac92..901a9df71 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -234,7 +234,7 @@ public class TestTaskSchedulerManager {
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
-
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class));
// called on taskAllocated
+
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class));
// called on taskAllocated
}
@Test(timeout = 5000)