This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b408587b TEZ-4451: ThreadLevel IO Stats Support for TEZ. (#331) 
(Ayush Saxena reviewed by Laszlo Bodor, Steve Loughran)
8b408587b is described below

commit 8b408587b39b2e84cb180effdf6b3a7e63ad5051
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Feb 6 14:19:22 2024 +0530

    TEZ-4451: ThreadLevel IO Stats Support for TEZ. (#331) (Ayush Saxena 
reviewed by Laszlo Bodor, Steve Loughran)
---
 .../java/org/apache/tez/runtime/task/TaskRunner2Callable.java    | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 810a80622..e6a74321f 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -18,6 +18,9 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
@@ -75,6 +78,7 @@ public class TaskRunner2Callable extends 
CallableWithNdc<TaskRunner2Callable.Tas
           LOG.info("Initializing task" + ", taskAttemptId={}", 
task.getTaskAttemptID());
           TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), 
task.getTaskAttemptID());
           TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit");
+          IOStatisticsContext.getCurrentIOStatisticsContext().reset();
           task.initialize();
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) 
{
@@ -116,6 +120,11 @@ public class TaskRunner2Callable extends 
CallableWithNdc<TaskRunner2Callable.Tas
       // For a successful task, however, this should be almost no delay since 
close has already happened.
       maybeFixInterruptStatus();
       LOG.info("Cleaning up task {}, stopRequested={}", 
task.getTaskAttemptID(), stopRequested.get());
+      String ioStats = IOStatisticsLogging.ioStatisticsToPrettyString(
+          
IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics());
+      if (StringUtils.isNotEmpty(ioStats)) {
+        LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats);
+      }
       task.getOutputContexts().forEach(outputContext
           -> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
           this.tezUmbilical)));

Reply via email to