This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new b5bf8dc2e TEZ-4589: Counter for the overall duration of 
succeeded/failed/killed task attempts (#382) (Laszlo Bodor reviewed by Ayush 
Saxena)
b5bf8dc2e is described below

commit b5bf8dc2e02aae224a6b13b9f491e988655a0719
Author: Bodor Laszlo <bodorlaszlo0...@gmail.com>
AuthorDate: Sat Nov 23 15:24:45 2024 +0100

    TEZ-4589: Counter for the overall duration of succeeded/failed/killed task 
attempts (#382) (Laszlo Bodor reviewed by Ayush Saxena)
---
 .../org/apache/tez/common/counters/DAGCounter.java | 22 ++++++
 .../dag/app/dag/event/DAGEventCounterUpdate.java   |  7 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      | 13 ++--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      | 86 ++++++++++++++++++++++
 4 files changed, 122 insertions(+), 6 deletions(-)

diff --git 
a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java 
b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
index 23c197843..ca575d4df 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java
@@ -30,6 +30,28 @@ public enum DAGCounter {
   NUM_KILLED_TASKS,
   NUM_SUCCEEDED_TASKS,
   TOTAL_LAUNCHED_TASKS,
+
+  /* The durations of task attempts are categorized based on their final 
states. The duration of successful tasks
+  can serve as a reference when analyzing the durations of failed or killed 
tasks. This is because solely examining
+  failed or killed task durations may be misleading, as these durations are 
measured from the submission time,
+  which does not always correspond to the actual start time of the task 
attempt on executor nodes
+  (e.g., in scenarios involving Hive LLAP).
+  These counters align with the duration metrics used for WALL_CLOCK_MILLIS.
+  As such, the following relationship applies:
+  WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + 
DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS
+  */
+
+  // Total amount of time spent on running FAILED task attempts. This can be 
blamed for performance degradation, as a
+  // DAG can still finish successfully in the presence of failed attempts.
+  DURATION_FAILED_TASKS_MILLIS,
+
+  // Total amount of time spent on running KILLED task attempts.
+  DURATION_KILLED_TASKS_MILLIS,
+
+  // Total amount of time spent on running SUCCEEDED task attempts, which can 
be a reference together with the same for
+  // FAILED and KILLED attempts.
+  DURATION_SUCCEEDED_TASKS_MILLIS,
+
   OTHER_LOCAL_TASKS,
   DATA_LOCAL_TASKS,
   RACK_LOCAL_TASKS,
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
index da0724dd2..3683a4951 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java
@@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent {
   
   public DAGEventCounterUpdate(TezDAGID dagId) {
     super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
-    counterUpdates = new 
ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
+    counterUpdates = new ArrayList<>();
   }
 
   public void addCounterUpdate(Enum<?> key, long incrValue) {
@@ -56,5 +56,10 @@ public class DAGEventCounterUpdate extends DAGEvent {
     public long getIncrementValue() {
       return incrValue;
     }
+
+    @Override
+    public String toString(){
+      return 
String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, 
incrValue=%d)", key, incrValue);
+    }
   }
 }
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index fb8aed267..13769db83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -967,23 +967,26 @@ public class TaskAttemptImpl implements TaskAttempt,
     return dagCounterEvent;
   }
 
-  private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
+  @VisibleForTesting
+  static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
       TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
     DAGEventCounterUpdate jce =
         new DAGEventCounterUpdate(taskAttempt.getDAGID());
 
+    long amSideWallClockTimeMs = 
TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs());
+    jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);
+
     if (taState == TaskAttemptState.FAILED) {
       jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
+      jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, 
amSideWallClockTimeMs);
     } else if (taState == TaskAttemptState.KILLED) {
       jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
+      jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, 
amSideWallClockTimeMs);
     } else if (taState == TaskAttemptState.SUCCEEDED ) {
       jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
+      jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 
amSideWallClockTimeMs);
     }
 
-    long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
-        taskAttempt.getDurationNs());
-    jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);
-
     return jce;
   }
 
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 82accae43..34a57a5f6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.dag.app.MockClock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -2261,6 +2263,85 @@ public class TestTaskAttempt {
     Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2);
   }
 
+  @Test
+  public void testDAGCounterUpdateEvent(){
+    TaskAttemptImpl taImpl = getMockTaskAttempt();
+
+    DAGEventCounterUpdate counterUpdateSucceeded = 
TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
+        TaskAttemptState.SUCCEEDED);
+    List<DAGEventCounterUpdate.CounterIncrementalUpdate> succeededUpdates = 
counterUpdateSucceeded.getCounterUpdates();
+    // SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS)
+    assertCounterIncrementalUpdate(succeededUpdates, 
DAGCounter.NUM_SUCCEEDED_TASKS, 1);
+    assertCounterIncrementalUpdate(succeededUpdates, 
DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000);
+    assertCounterIncrementalUpdate(succeededUpdates, 
DAGCounter.WALL_CLOCK_MILLIS, 1000);
+    // other counters are not updated (no FAILED, no KILLED)
+    assertCounterIncrementalUpdateNotFound(succeededUpdates, 
DAGCounter.NUM_FAILED_TASKS);
+    assertCounterIncrementalUpdateNotFound(succeededUpdates, 
DAGCounter.NUM_KILLED_TASKS);
+    assertCounterIncrementalUpdateNotFound(succeededUpdates, 
DAGCounter.DURATION_FAILED_TASKS_MILLIS);
+    assertCounterIncrementalUpdateNotFound(succeededUpdates, 
DAGCounter.DURATION_KILLED_TASKS_MILLIS);
+
+    DAGEventCounterUpdate counterUpdateFailed = 
TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
+        TaskAttemptState.FAILED);
+    List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = 
counterUpdateFailed.getCounterUpdates();
+    // FAILED task related counters are updated (+ WALL_CLOCK_MILLIS)
+    assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 
1);
+    assertCounterIncrementalUpdate(failedUpdates, 
DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000);
+    assertCounterIncrementalUpdate(failedUpdates, 
DAGCounter.WALL_CLOCK_MILLIS, 1000);
+    // other counters are not updated (no SUCCEEDED, no KILLED)
+    assertCounterIncrementalUpdateNotFound(failedUpdates, 
DAGCounter.NUM_SUCCEEDED_TASKS);
+    assertCounterIncrementalUpdateNotFound(failedUpdates, 
DAGCounter.NUM_KILLED_TASKS);
+    assertCounterIncrementalUpdateNotFound(failedUpdates, 
DAGCounter.DURATION_KILLED_TASKS_MILLIS);
+    assertCounterIncrementalUpdateNotFound(failedUpdates, 
DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
+
+    DAGEventCounterUpdate counterUpdateKilled = 
TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
+        TaskAttemptState.KILLED);
+    List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = 
counterUpdateKilled.getCounterUpdates();
+    // KILLED task related counters are updated (+ WALL_CLOCK_MILLIS)
+    assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 
1);
+    assertCounterIncrementalUpdate(killedUpdates, 
DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000);
+    assertCounterIncrementalUpdate(killedUpdates, 
DAGCounter.WALL_CLOCK_MILLIS, 1000);
+    // other counters are not updated (no SUCCEEDED, no FAILED)
+    assertCounterIncrementalUpdateNotFound(killedUpdates, 
DAGCounter.NUM_SUCCEEDED_TASKS);
+    assertCounterIncrementalUpdateNotFound(killedUpdates, 
DAGCounter.NUM_FAILED_TASKS);
+    assertCounterIncrementalUpdateNotFound(killedUpdates, 
DAGCounter.DURATION_FAILED_TASKS_MILLIS);
+    assertCounterIncrementalUpdateNotFound(failedUpdates, 
DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
+  }
+
+  private TaskAttemptImpl getMockTaskAttempt() {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class),
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new 
MonotonicClock(),
+        mock(TaskHeartbeatHandler.class), mock(AppContext.class), false,
+        mock(Resource.class), mock(ContainerContext.class), false);
+  }
+
+  private void 
assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate>
 counterUpdates,
+      DAGCounter counter, int expectedValue) {
+    for (DAGEventCounterUpdate.CounterIncrementalUpdate update : 
counterUpdates) {
+      if (update.getCounterKey().equals(counter) && update.getIncrementValue() 
== expectedValue) {
+        return;
+      }
+    }
+    Assert.fail(
+        String.format("Haven't found counter update %s=%d, instead seen: %s", 
counter, expectedValue, counterUpdates));
+  }
+
+  private void assertCounterIncrementalUpdateNotFound(
+      List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, 
DAGCounter counter) {
+    for (DAGEventCounterUpdate.CounterIncrementalUpdate update : 
counterUpdates) {
+      if (update.getCounterKey().equals(counter)) {
+        Assert.fail(
+            String.format("Found counter update %s=%d, which is not expected", 
counter, update.getIncrementValue()));
+      }
+    }
+  }
+
   private Event verifyEventType(List<Event> events,
       Class<? extends Event> eventClass, int expectedOccurences) {
     int count = 0;
@@ -2344,6 +2425,11 @@ public class TestTaskAttempt {
     protected void sendInputFailedToConsumers() {
       inputFailedReported = true;
     }
+
+    @Override
+    public long getDurationNs(){
+      return 1000000000L; // 1000000000ns = 1000ms
+    }
   }
 
   private static ContainerContext createFakeContainerContext() {

Reply via email to