This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push: new b5bf8dc2e TEZ-4589: Counter for the overall duration of succeeded/failed/killed task attempts (#382) (Laszlo Bodor reviewed by Ayush Saxena) b5bf8dc2e is described below commit b5bf8dc2e02aae224a6b13b9f491e988655a0719 Author: Bodor Laszlo <bodorlaszlo0...@gmail.com> AuthorDate: Sat Nov 23 15:24:45 2024 +0100 TEZ-4589: Counter for the overall duration of succeeded/failed/killed task attempts (#382) (Laszlo Bodor reviewed by Ayush Saxena) --- .../org/apache/tez/common/counters/DAGCounter.java | 22 ++++++ .../dag/app/dag/event/DAGEventCounterUpdate.java | 7 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 13 ++-- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 86 ++++++++++++++++++++++ 4 files changed, 122 insertions(+), 6 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index 23c197843..ca575d4df 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -30,6 +30,28 @@ public enum DAGCounter { NUM_KILLED_TASKS, NUM_SUCCEEDED_TASKS, TOTAL_LAUNCHED_TASKS, + + /* The durations of task attempts are categorized based on their final states. The duration of successful tasks + can serve as a reference when analyzing the durations of failed or killed tasks. This is because solely examining + failed or killed task durations may be misleading, as these durations are measured from the submission time, + which does not always correspond to the actual start time of the task attempt on executor nodes + (e.g., in scenarios involving Hive LLAP). + These counters align with the duration metrics used for WALL_CLOCK_MILLIS. + As such, the following relationship applies: + WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS + */ + + // Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a + // DAG can still finish successfully in the presence of failed attempts. + DURATION_FAILED_TASKS_MILLIS, + + // Total amount of time spent on running KILLED task attempts. + DURATION_KILLED_TASKS_MILLIS, + + // Total amount of time spent on running SUCCEEDED task attempts, which can be a reference together with the same for + // FAILED and KILLED attempts. + DURATION_SUCCEEDED_TASKS_MILLIS, + OTHER_LOCAL_TASKS, DATA_LOCAL_TASKS, RACK_LOCAL_TASKS, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java index da0724dd2..3683a4951 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java @@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent { public DAGEventCounterUpdate(TezDAGID dagId) { super(dagId, DAGEventType.DAG_COUNTER_UPDATE); - counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>(); + counterUpdates = new ArrayList<>(); } public void addCounterUpdate(Enum<?> key, long incrValue) { @@ -56,5 +56,10 @@ public class DAGEventCounterUpdate extends DAGEvent { public long getIncrementValue() { return incrValue; } + + @Override + public String toString(){ + return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue); + } } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index fb8aed267..13769db83 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -967,23 +967,26 @@ public class TaskAttemptImpl implements TaskAttempt, return dagCounterEvent; } - private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished( + @VisibleForTesting + static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished( TaskAttemptImpl taskAttempt, TaskAttemptState taState) { DAGEventCounterUpdate jce = new DAGEventCounterUpdate(taskAttempt.getDAGID()); + long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs()); + jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs); + if (taState == TaskAttemptState.FAILED) { jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, amSideWallClockTimeMs); } else if (taState == TaskAttemptState.KILLED) { jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, amSideWallClockTimeMs); } else if (taState == TaskAttemptState.SUCCEEDED ) { jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, amSideWallClockTimeMs); } - long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis( - taskAttempt.getDurationNs()); - jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs); - return jce; } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 82accae43..34a57a5f6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.dag.impl; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.dag.app.MockClock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -2261,6 +2263,85 @@ public class TestTaskAttempt { Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2); } + @Test + public void testDAGCounterUpdateEvent(){ + TaskAttemptImpl taImpl = getMockTaskAttempt(); + + DAGEventCounterUpdate counterUpdateSucceeded = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.SUCCEEDED); + List<DAGEventCounterUpdate.CounterIncrementalUpdate> succeededUpdates = counterUpdateSucceeded.getCounterUpdates(); + // SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.NUM_SUCCEEDED_TASKS, 1); + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no FAILED, no KILLED) + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_FAILED_TASKS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_KILLED_TASKS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS); + + DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.FAILED); + List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = counterUpdateFailed.getCounterUpdates(); + // FAILED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1); + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no SUCCEEDED, no KILLED) + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS); + + DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.KILLED); + List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = counterUpdateKilled.getCounterUpdates(); + // KILLED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1); + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no SUCCEEDED, no FAILED) + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS); + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS); + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS); + } + + private TaskAttemptImpl getMockTaskAttempt() { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(), + mock(TaskHeartbeatHandler.class), mock(AppContext.class), false, + mock(Resource.class), mock(ContainerContext.class), false); + } + + private void assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, + DAGCounter counter, int expectedValue) { + for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) { + if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) { + return; + } + } + Assert.fail( + String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates)); + } + + private void assertCounterIncrementalUpdateNotFound( + List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, DAGCounter counter) { + for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) { + if (update.getCounterKey().equals(counter)) { + Assert.fail( + String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue())); + } + } + } + private Event verifyEventType(List<Event> events, Class<? extends Event> eventClass, int expectedOccurences) { int count = 0; @@ -2344,6 +2425,11 @@ public class TestTaskAttempt { protected void sendInputFailedToConsumers() { inputFailedReported = true; } + + @Override + public long getDurationNs(){ + return 1000000000L; // 1000000000ns = 1000ms + } } private static ContainerContext createFakeContainerContext() {