This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 19b2351a9 TEZ-4554: Counter for used nodes within a DAG (#362) (Laszlo
19b2351a9 is described below

commit 19b2351a909589500397cc69e12d22da1f74949a
Author: Bodor Laszlo <[email protected]>
AuthorDate: Tue Jun 25 07:51:05 2024 +0200

    TEZ-4554: Counter for used nodes within a DAG (#362) (Laszlo
---
 .../org/apache/tez/common/counters/DAGCounter.java | 15 ++++-
 .../main/java/org/apache/tez/dag/app/dag/DAG.java  |  4 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   | 14 ++--
 .../tez/dag/app/rm/TaskSchedulerManager.java       | 44 ++++++++-----
 .../apache/tez/dag/app/dag/impl/TestDAGImpl.java   | 75 +++++++++++++++++++---
 .../tez/dag/app/rm/TestTaskSchedulerManager.java   |  2 +-
 6 files changed, 121 insertions(+), 33 deletions(-)

diff --git 
a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java 
b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 149bacdf5..23c197843 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -87,5 +87,18 @@ public enum DAGCounter {
    * Number of container reuses during a DAG. This is incremented every time
    * the containerReused callback is called in the TaskSchedulerContext.
    */
-  TOTAL_CONTAINER_REUSE_COUNT
+  TOTAL_CONTAINER_REUSE_COUNT,
+
+  /*
+   * Number of nodes to which task attempts were assigned in this DAG.
+   * Nodes are distinguished by the Yarn NodeId.getHost().
+   */
+  NODE_USED_COUNT,
+
+  /*
+   * Total number of nodes visible to the task scheduler (regardless of
+   * task assignments). This is typically exposed by a resource manager
+   * client.
+   */
+  NODE_TOTAL_COUNT
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index ff5afb409..c828d81b2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.counters.DAGCounter;
@@ -106,7 +106,7 @@ public interface DAG extends DagInfo {
 
   void incrementDagCounter(DAGCounter counter, int incrValue);
   void setDagCounter(DAGCounter counter, int setValue);
-  void addUsedContainer(ContainerId containerId);
+  void addUsedContainer(Container container);
 
   /**
    * Called by the DAGAppMaster when the DAG is started normally or in the 
event of recovery.
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index b00cea8b2..307580741 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -250,6 +251,8 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
 
   private final MemoryMXBean memoryMXBean = 
ManagementFactory.getMemoryMXBean();
   private final Set<ContainerId> containersUsedByCurrentDAG = new HashSet<>();
+  @VisibleForTesting
+  final Set<String> nodesUsedByCurrentDAG = new HashSet<>();
 
   protected static final
     StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
@@ -2563,7 +2566,7 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
   @Override
   public void onFinish() {
     stopVertexServices();
-    handleUsedContainersOnDagFinish();
+    updateCounters();
   }
 
   private void startVertexServices() {
@@ -2579,11 +2582,14 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
-  public void addUsedContainer(ContainerId containerId) {
-    containersUsedByCurrentDAG.add(containerId);
+  public void addUsedContainer(Container container) {
+    containersUsedByCurrentDAG.add(container.getId());
+    nodesUsedByCurrentDAG.add(container.getNodeId().getHost());
   }
 
-  private void handleUsedContainersOnDagFinish() {
+  private void updateCounters() {
     setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 
containersUsedByCurrentDAG.size());
+    setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
+    setDagCounter(DAGCounter.NODE_TOTAL_COUNT, 
appContext.getTaskScheduler().getNumClusterNodes(true));
   }
 }
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 0bf62afbc..e311c23e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -224,9 +224,24 @@ public class TaskSchedulerManager extends AbstractService 
implements
   }
 
   public int getNumClusterNodes() {
+    return getNumClusterNodes(false);
+  }
+
+  public int getNumClusterNodes(boolean tryUpdate){
+    if (cachedNodeCount == -1 && tryUpdate){
+      cachedNodeCount = countAllNodes();
+    }
     return cachedNodeCount;
   }
-  
+
+  private int countAllNodes() {
+    try {
+      return taskSchedulers[0].getClusterNodeCount();
+    } catch (Exception e) {
+      return handleTaskSchedulerExceptionWhileGettingNodeCount(e);
+    }
+  }
+
   public Resource getAvailableResources(int schedulerId) {
     try {
       return taskSchedulers[schedulerId].getAvailableResources();
@@ -746,7 +761,7 @@ public class TaskSchedulerManager extends AbstractService 
implements
       sendEvent(new AMNodeEventContainerAllocated(container
           .getNodeId(), schedulerId, container.getId()));
     }
-    appContext.getCurrentDAG().addUsedContainer(containerId);
+    appContext.getCurrentDAG().addUsedContainer(container);
 
     TaskAttempt taskAttempt = event.getTaskAttempt();
     // TODO - perhaps check if the task still needs this container
@@ -883,19 +898,7 @@ public class TaskSchedulerManager extends AbstractService 
implements
     // Doubles as a mechanism to update node counts periodically. Hence 
schedulerId required.
 
     // TODO Handle this in TEZ-2124. Need a way to know which scheduler is 
calling in.
-    int nodeCount = 0;
-    try {
-      nodeCount = taskSchedulers[0].getClusterNodeCount();
-    } catch (Exception e) {
-      String msg = "Error in TaskScheduler while getting node count"
-          + ", scheduler=" + 
Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
-      LOG.error(msg, e);
-      sendEvent(
-          new DAGAppMasterEventUserServiceFatalError(
-              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
-              msg, e));
-      throw new RuntimeException(e);
-    }
+    int nodeCount = countAllNodes();
     if (nodeCount != cachedNodeCount) {
       cachedNodeCount = nodeCount;
       sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
@@ -903,6 +906,17 @@ public class TaskSchedulerManager extends AbstractService 
implements
     return dagAppMaster.getProgress();
   }
 
+  private int handleTaskSchedulerExceptionWhileGettingNodeCount(Exception e) {
+    String msg = "Error in TaskScheduler while getting node count"
+        + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(0, 
appContext);
+    LOG.error(msg, e);
+    sendEvent(
+        new DAGAppMasterEventUserServiceFatalError(
+            DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+            msg, e));
+    throw new RuntimeException(e);
+  }
+
   public void reportError(int taskSchedulerIndex, ServicePluginError 
servicePluginError,
                           String diagnostics,
                           DagInfo dagInfo) {
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1e003accc..46c4fe1cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -49,6 +50,7 @@ import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.junit.Rule;
@@ -60,6 +62,8 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
@@ -181,6 +185,7 @@ public class TestDAGImpl {
   private ACLManager aclManager;
   private ApplicationAttemptId appAttemptId;
   private DAGImpl dag;
+  private TaskSchedulerManager taskSchedulerManager;
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
@@ -861,11 +866,12 @@ public class TestDAGImpl {
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
+    taskSchedulerManager = mock(TaskSchedulerManager.class);
     execService = mock(ListeningExecutorService.class);
     final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
     when(appContext.getHadoopShim()).thenReturn(defaultShim);
     
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
-    
+
     doAnswer(new Answer() {
       public ListenableFuture<Void> answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
@@ -2358,22 +2364,71 @@ public class TestDAGImpl {
 
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testTotalContainersUsedCounter() {
+    DAGImpl spy = getDagSpy();
+
+    
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000005"),
+        mock(NodeId.class), null, null, null, null));
+    
spy.addUsedContainer(Container.newInstance(ContainerId.fromString("container_e16_1504924099862_7571_01_000006"),
+        mock(NodeId.class), null, null, null, null));
+
+    spy.onFinish();
+    // 2 calls to addUsedContainer
+    verify(spy, times(2)).addUsedContainer(any(Container.class));
+    // 2 containers were used
+    Assert.assertEquals(2,
+        
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.TOTAL_CONTAINERS_USED.name())
+            .getValue());
+  }
+
+  @Test(timeout = 5000)
+  public void testNodesUsedCounter() {
+    DAGImpl spy = getDagSpy();
+
+    Container containerOnHost = mock(Container.class);
+    
when(containerOnHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
+    Container containerOnSameHost = mock(Container.class);
+    
when(containerOnSameHost.getNodeId()).thenReturn(NodeId.fromString("localhost:0"));
+    Container containerOnDifferentHost = mock(Container.class);
+    
when(containerOnDifferentHost.getNodeId()).thenReturn(NodeId.fromString("otherhost:0"));
+    Container containerOnSameHostWithDifferentPort = mock(Container.class);
+    
when(containerOnSameHostWithDifferentPort.getNodeId()).thenReturn(NodeId.fromString("localhost:1"));
+
+    spy.addUsedContainer(containerOnHost);
+    spy.addUsedContainer(containerOnSameHost);
+    spy.addUsedContainer(containerOnDifferentHost);
+    spy.addUsedContainer(containerOnSameHostWithDifferentPort);
+
+    when(taskSchedulerManager.getNumClusterNodes(anyBoolean())).thenReturn(10);
+
+    spy.onFinish();
+    // 4 calls to addUsedContainer
+    verify(spy, times(4)).addUsedContainer(any(Container.class));
+
+    // 2 distinct node hosts were seen: localhost, otherhost
+    Assert.assertEquals(2,
+        
spy.getAllCounters().getGroup(DAGCounter.class.getName()).findCounter(DAGCounter.NODE_USED_COUNT.name())
+            .getValue());
+
+    Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("localhost"));
+    Assert.assertTrue(spy.nodesUsedByCurrentDAG.contains("otherhost"));
+
+    Assert.assertEquals(10,
+        spy.getAllCounters().getGroup(DAGCounter.class.getName())
+            .findCounter(DAGCounter.NODE_TOTAL_COUNT.name())
+            .getValue());
+  }
+
+  private DAGImpl getDagSpy() {
     initDAG(mrrDag);
     dispatcher.await();
     startDAG(mrrDag);
     dispatcher.await();
 
-    DAGImpl spy = spy(mrrDag);
-    spy.addUsedContainer(mock(ContainerId.class));
-    spy.addUsedContainer(mock(ContainerId.class));
+    // needed when onFinish() method is called on a DAGImpl
+    when(mrrAppContext.getTaskScheduler()).thenReturn(taskSchedulerManager);
 
-    spy.onFinish();
-    // 2 calls to addUsedContainer, obviously, we did it here
-    verify(spy, times(2)).addUsedContainer(any(ContainerId.class));
-    // 1 call to setDagCounter, which happened at dag.onFinish
-    verify(spy).setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, 2);
+    return spy(mrrDag);
   }
 }
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index e3dd1ac92..901a9df71 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -234,7 +234,7 @@ public class TestTaskSchedulerManager {
     assertEquals(priority, assignEvent.getPriority());
     assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
 
-    
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(ContainerId.class));
 // called on taskAllocated
+    
verify(mockAppContext.getCurrentDAG()).addUsedContainer(any(Container.class)); 
// called on taskAllocated
   }
 
   @Test(timeout = 5000)

Reply via email to