This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 8b408587b TEZ-4451: ThreadLevel IO Stats Support for TEZ. (#331)
(Ayush Saxena reviewed by Laszlo Bodor, Steve Loughran)
8b408587b is described below
commit 8b408587b39b2e84cb180effdf6b3a7e63ad5051
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Feb 6 14:19:22 2024 +0530
TEZ-4451: ThreadLevel IO Stats Support for TEZ. (#331) (Ayush Saxena
reviewed by Laszlo Bodor, Steve Loughran)
---
.../java/org/apache/tez/runtime/task/TaskRunner2Callable.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 810a80622..e6a74321f 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -18,6 +18,9 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
@@ -75,6 +78,7 @@ public class TaskRunner2Callable extends
CallableWithNdc<TaskRunner2Callable.Tas
LOG.info("Initializing task" + ", taskAttemptId={}",
task.getTaskAttemptID());
TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(),
task.getTaskAttemptID());
TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit");
+ IOStatisticsContext.getCurrentIOStatisticsContext().reset();
task.initialize();
if (!stopRequested.get() && !Thread.currentThread().isInterrupted())
{
@@ -116,6 +120,11 @@ public class TaskRunner2Callable extends
CallableWithNdc<TaskRunner2Callable.Tas
// For a successful task, however, this should be almost no delay since
close has already happened.
maybeFixInterruptStatus();
LOG.info("Cleaning up task {}, stopRequested={}",
task.getTaskAttemptID(), stopRequested.get());
+ String ioStats = IOStatisticsLogging.ioStatisticsToPrettyString(
+
IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics());
+ if (StringUtils.isNotEmpty(ioStats)) {
+ LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats);
+ }
task.getOutputContexts().forEach(outputContext
-> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
this.tezUmbilical)));