Repository: tez Updated Branches: refs/heads/master 282bb0a3f -> f49d66539
TEZ-3957: Report TASK_DURATION_MILLIS as a Counter for completed tasks (Sergey Shelukhin, reviewed by Gopal V) Signed-off-by: Gopal V <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f49d6653 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f49d6653 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f49d6653 Branch: refs/heads/master Commit: f49d665393a884295f529ffc7b9493cfa7bb3853 Parents: 282bb0a Author: Sergey Shelukhin <[email protected]> Authored: Tue Dec 11 11:45:58 2018 -0800 Committer: Gopal V <[email protected]> Committed: Tue Dec 11 11:45:58 2018 -0800 ---------------------------------------------------------------------- .../apache/tez/common/counters/DAGCounter.java | 2 + .../apache/tez/common/counters/TaskCounter.java | 2 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 45 +++++++++++++++++++- .../runtime/LogicalIOProcessorRuntimeTask.java | 18 ++++++++ .../org/apache/tez/runtime/RuntimeTask.java | 4 ++ .../tez/runtime/task/TestTaskExecution2.java | 4 +- 6 files changed, 72 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index 5064c35..0a32d38 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -38,5 +38,7 @@ public enum DAGCounter { NUM_UBER_SUBTASKS, NUM_FAILED_UBERTASKS, AM_CPU_MILLISECONDS, + /** Wall clock time taken by all the tasks. */ + WALL_CLOCK_MILLIS, AM_GC_TIME_MILLIS } http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java index 2f18bc6..80424c7 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java @@ -74,6 +74,8 @@ public enum TaskCounter { MERGED_MAP_OUTPUTS, GC_TIME_MILLIS, CPU_MILLISECONDS, + /** Wall clock time taken by the task initialization and execution. */ + WALL_CLOCK_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, COMMITTED_HEAP_BYTES, http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 7399979..3107330 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -179,6 +180,12 @@ public class TaskAttemptImpl implements TaskAttempt, private TaskAttemptRecoveryData recoveryData; private long launchTime = 0; private long finishTime = 0; + /** System.nanoTime for task launch time, if recorded in this JVM. */ + private Long launchTimeNs; + /** System.nanoTime for task finish time, if recorded in this JVM. */ + private Long finishTimeNs; + /** Whether the task was recovered from a prior AM; see getDurationNs. */ + private boolean isRecoveredDuration; private String trackerName; private int httpPort; @@ -782,6 +789,25 @@ public class TaskAttemptImpl implements TaskAttempt, } } + + /** @return task runtime duration in NS. */ + public long getDurationNs() { + readLock.lock(); + try { + if (isRecoveredDuration) { + // NS values are not mappable between JVMs (per documentation, at + // least), so just use the clock after recovery. + return TimeUnit.MILLISECONDS.toNanos(launchTime == 0 ? 0 + : (finishTime == 0 ? clock.getTime() : finishTime) - launchTime); + } else { + long ft = (finishTimeNs == null ? System.nanoTime() : finishTimeNs); + return (launchTimeNs == null) ? 0 : (ft - launchTimeNs); + } + } finally { + readLock.unlock(); + } + } + public long getCreationTime() { readLock.lock(); try { @@ -930,6 +956,8 @@ public class TaskAttemptImpl implements TaskAttempt, // set the finish time only if launch time is set if (launchTime != 0 && finishTime == 0) { finishTime = clock.getTime(); + // The default clock is not safe for measuring durations. + finishTimeNs = System.nanoTime(); } } @@ -957,6 +985,10 @@ public class TaskAttemptImpl implements TaskAttempt, jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1); } + long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis( + taskAttempt.getDurationNs()); + jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs); + return jce; } @@ -1032,6 +1064,14 @@ public class TaskAttemptImpl implements TaskAttempt, // */ // } + /** + * Records the launch time of the task. + */ + private void setLaunchTime() { + launchTime = clock.getTime(); + launchTimeNs = System.nanoTime(); + } + private void updateProgressSplits() { // double newProgress = reportedStatus.progress; // newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D); @@ -1215,6 +1255,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.recoveryData.getTaskAttemptStartedEvent(); if (taStartedEvent != null) { ta.launchTime = taStartedEvent.getStartTime(); + ta.isRecoveredDuration = true; TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData.getTaskAttemptFinishedEvent(); if (taFinishedEvent == null) { @@ -1383,6 +1424,7 @@ public class TaskAttemptImpl implements TaskAttempt, .getTaskAttemptState(), helper.getFailureType(event)); } else { ta.finishTime = ta.recoveryData.getTaskAttemptFinishedEvent().getFinishTime(); + ta.isRecoveredDuration = true; } if (event instanceof RecoveryEvent) { @@ -1419,7 +1461,7 @@ public class TaskAttemptImpl implements TaskAttempt, .getNetworkLocation()); ta.lastNotifyProgressTimestamp = ta.clock.getTime(); - ta.launchTime = ta.clock.getTime(); + ta.setLaunchTime(); // TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils @@ -1630,6 +1672,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.sendEvent(new VertexEventRouteEvent(ta.getVertexID(), tezEvents)); } ta.finishTime = taFinishedEvent.getFinishTime(); + ta.isRecoveredDuration = true; } else { ta.setFinishTime(); // Send out history event. http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 0ac916f..87ebb7b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.hadoop.shim.HadoopShim; @@ -57,6 +58,7 @@ import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.RunnableWithNdc; import org.apache.tez.common.TezExecutors; +import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -160,6 +162,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final boolean initializeProcessorFirst; private final boolean initializeProcessorIOSerially; private final TezExecutors sharedExecutor; + /** nanoTime of the task initialization start. */ + private Long initStartTimeNs = null; public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, @@ -229,6 +233,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { public void initialize() throws Exception { Preconditions.checkState(this.state.get() == State.NEW, "Already initialized"); this.state.set(State.INITED); + if (this.tezCounters != null) { + this.initStartTimeNs = System.nanoTime(); + } this.processorContext = createProcessorContext(); this.processor = createProcessor(processorDescriptor.getClassName(), processorContext); @@ -1077,4 +1084,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { public Configuration getTaskConf() { return tezConf; } + + @Override + public void setFrameworkCounters() { + super.setFrameworkCounters(); + if (tezCounters != null && isUpdatingSystemCounters()) { + long timeNs = initStartTimeNs == null ? 0 + : (System.nanoTime() - initStartTimeNs); + tezCounters.findCounter(TaskCounter.WALL_CLOCK_MILLISECONDS) + .setValue(TimeUnit.NANOSECONDS.toMillis(timeNs)); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index 7b86d4b..a53d0d2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -178,4 +178,8 @@ public abstract class RuntimeTask { } public abstract void abortTask(); + + protected final boolean isUpdatingSystemCounters() { + return counterUpdater != null; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/f49d6653/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 07b9d33..6c25f0a 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -675,8 +675,8 @@ public class TestTaskExecution2 { // If Target <=0, assert counter count is exactly 0 if (minTaskCounterCount <= 0) { - assertEquals(0, numTaskCounters); - assertEquals(0, numFsCounters); + assertEquals(tezCounters.toString(), 0, numTaskCounters); + assertEquals(tezCounters.toString(), 0, numFsCounters); } else { assertTrue(numTaskCounters >= minTaskCounterCount); assertTrue(numFsCounters >= minFsCounterCount);
