This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 1084699dd TEZ-4527: Add generic and pluggable hooks for DAGs and task 
attempts (#324) (Shohei Okumiya reviewed by Laszlo Bodor)
1084699dd is described below

commit 1084699ddb1baf6e4a6df97d988ac696d33f41e8
Author: Shohei Okumiya <g...@okumin.com>
AuthorDate: Sun Dec 22 20:10:24 2024 +0900

    TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts (#324) 
(Shohei Okumiya reviewed by Laszlo Bodor)
---
 .../org/apache/tez/dag/api/TezConfiguration.java   | 21 +++++++++-
 .../org/apache/tez/runtime/hook/TezDAGHook.java    | 44 +++++++++++++++++++++
 .../tez/runtime/hook/TezTaskAttemptHook.java       | 44 +++++++++++++++++++++
 .../org/apache/tez/runtime/hook/package-info.java  | 22 +++++++++++
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  | 23 ++++++++---
 .../org/apache/tez/dag/app/ThreadDumpDAGHook.java  | 41 ++++++++++++++++++++
 .../apache/tez/runtime/TezThreadDumpHelper.java    | 45 +++++++---------------
 .../java/org/apache/tez/runtime/task/TezChild.java | 16 ++++++--
 .../runtime/task/ThreadDumpTaskAttemptHook.java    | 41 ++++++++++++++++++++
 .../test/java/org/apache/tez/test/TestTezJobs.java |  6 +++
 10 files changed, 260 insertions(+), 43 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7e8685363..8862f4b7d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -2297,12 +2297,14 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = 
"tez.mrreader.config.update.properties";
 
   /**
-   *  Frequency at which thread dump should be captured. Supports TimeUnits.
+   *  Frequency at which thread dump should be captured. Supports TimeUnits. 
This is effective only
+   *  when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to 
tez.am.hooks or
+   *  org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to 
tez.task.attempt.hooks.
    */
   @ConfigurationScope(Scope.DAG)
   @ConfigurationProperty
   public static final String TEZ_THREAD_DUMP_INTERVAL = 
"tez.thread.dump.interval";
-  public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
+  public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";
 
   /**
    * Limits the amount of data that can be written to LocalFileSystem by a 
Task.
@@ -2312,4 +2314,19 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = 
"tez.task.local-fs.write-limit.bytes";
   public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;
 
+  /**
+   * Comma-separated list of hook classes implementing 
org.apache.tez.runtime.hook.TezDAGHook.
+   * e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty
+  public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";
+
+  /**
+   * Comma-separated list of hook classes implementing 
org.apache.tez.runtime.hook.TezTaskAttemptHook.
+   * e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
+   */
+  @ConfigurationScope(Scope.DAG)
+  @ConfigurationProperty
+  public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + 
"attempt.hooks";
 }
diff --git 
a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java 
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java
new file mode 100644
index 000000000..7fb015bdb
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezDAGID;
+
+/**
+ * A hook which is instantiated and triggered before and after a DAG is 
executed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TezDAGHook {
+  /**
+   * Invoked before the DAG starts.
+   *
+   * @param id the DAG id
+   * @param conf the conf
+   */
+  void start(TezDAGID id, Configuration conf);
+
+  /**
+   * Invoked after the DAG finishes.
+   */
+  void stop();
+}
diff --git 
a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java 
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java
new file mode 100644
index 000000000..54931b64d
--- /dev/null
+++ 
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * A hook which is instantiated and triggered before and after a task attempt 
is executed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TezTaskAttemptHook {
+  /**
+   * Invoked before the task attempt starts.
+   *
+   * @param id the task attempt id
+   * @param conf the conf
+   */
+  void start(TezTaskAttemptID id, Configuration conf);
+
+  /**
+   * Invoked after the task attempt finishes.
+   */
+  void stop();
+}
diff --git 
a/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java 
b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java
new file mode 100644
index 000000000..d977897d8
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9c7cc18b6..4172a5a36 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.Utils;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
@@ -187,7 +188,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShimsLoader;
-import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezDAGHook;
 import org.apache.tez.util.LoggingUtils;
 import org.apache.tez.util.TezMxBeanResourceCalculator;
 import org.codehaus.jettison.json.JSONException;
@@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService {
   Map<Service, ServiceWithDependency> services =
       new LinkedHashMap<Service, ServiceWithDependency>();
   private ThreadLocalMap mdcContext;
-  private TezThreadDumpHelper tezThreadDumpHelper = 
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
+  private TezDAGHook[] hooks = {};
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -770,7 +771,9 @@ public class DAGAppMaster extends AbstractService {
           "DAGAppMaster Internal Error occurred");
       break;
     case DAG_FINISHED:
-      tezThreadDumpHelper.stop();
+      for (TezDAGHook hook : hooks) {
+        hook.stop();
+      }
       DAGAppMasterEventDAGFinished finishEvt =
           (DAGAppMasterEventDAGFinished) event;
       String timeStamp = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(Calendar.getInstance().getTime());
@@ -2226,8 +2229,10 @@ public class DAGAppMaster extends AbstractService {
       execService.shutdownNow();
     }
 
-    // Check if the thread dump service is up in any case, if yes attempt a 
shutdown
-    tezThreadDumpHelper.stop();
+    // Try to shut down any hooks that are still active
+    for (TezDAGHook hook : hooks) {
+      hook.stop();
+    }
 
     super.serviceStop();
   }
@@ -2599,7 +2604,13 @@ public class DAGAppMaster extends AbstractService {
   private void startDAGExecution(DAG dag, final Map<String, LocalResource> 
additionalAmResources)
       throws TezException {
     currentDAG = dag;
-    tezThreadDumpHelper = 
TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
+    final Configuration conf = dag.getConf();
+    final String[] hookClasses = 
conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
+    hooks = new TezDAGHook[hookClasses.length];
+    for (int i = 0; i < hooks.length; i++) {
+      hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
+      hooks[i].start(dag.getID(), conf);
+    }
 
     // Try localizing the actual resources.
     List<URL> additionalUrlsForClasspath;
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java
new file mode 100644
index 000000000..ff657e47f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezDAGHook;
+
+/**
+ * A DAG hook which dumps thread information periodically.
+ */
+public class ThreadDumpDAGHook implements TezDAGHook {
+  private TezThreadDumpHelper helper;
+
+  @Override
+  public void start(TezDAGID id, Configuration conf) {
+    helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
+  }
+
+  @Override
+  public void stop() {
+    helper.stop();
+  }
+}
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
index 6f3e9fec1..022186a4b 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Appender;
+import org.apache.tez.common.Preconditions;
 import org.apache.tez.common.TezContainerLogAppender;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +47,9 @@ import static 
org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_D
 
 public class TezThreadDumpHelper {
 
-  public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = 
new NoopTezThreadDumpHelper();
-  private long duration = 0L;
-  private Path basePath = null;
-  private FileSystem fs = null;
+  private final long duration;
+  private final Path basePath;
+  private final FileSystem fs;
 
   private static final ThreadMXBean THREAD_BEAN = 
ManagementFactory.getThreadMXBean();
   private static final Logger LOG = 
LoggerFactory.getLogger(TezThreadDumpHelper.class);
@@ -70,21 +71,17 @@ public class TezThreadDumpHelper {
         "path: {}", duration, basePath);
   }
 
-  public TezThreadDumpHelper() {
-  }
-
   public static TezThreadDumpHelper getInstance(Configuration conf) {
-    long periodicThreadDumpFrequency =
-        conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, 
TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-
-    if (periodicThreadDumpFrequency > 0) {
-      try {
-        return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
-      } catch (IOException e) {
-        LOG.warn("Can not initialize periodic thread dump service", e);
-      }
+    long periodicThreadDumpFrequency = 
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL,
+        TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+    Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be 
positive duration",
+        TEZ_THREAD_DUMP_INTERVAL);
+
+    try {
+      return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Can not initialize periodic thread dump 
service", e);
     }
-    return NOOP_TEZ_THREAD_DUMP_HELPER;
   }
 
   public TezThreadDumpHelper start(String name) {
@@ -178,18 +175,4 @@ public class TezThreadDumpHelper {
       return id + " (" + taskName + ")";
     }
   }
-
-  private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {
-
-    @Override
-    public TezThreadDumpHelper start(String name) {
-      // Do Nothing
-      return this;
-    }
-
-    @Override
-    public void stop() {
-      // Do Nothing
-    }
-  }
 }
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 98b07100a..ed14bd880 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,6 +52,7 @@ import 
org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.log4j.helpers.ThreadLocalMap;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezExecutors;
 import org.apache.tez.common.TezLocalResource;
@@ -69,10 +70,10 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShimsLoader;
-import org.apache.tez.runtime.TezThreadDumpHelper;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.hook.TezTaskAttemptHook;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.apache.tez.util.LoggingUtils;
 
@@ -120,7 +121,6 @@ public class TezChild {
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   private final String user;
   private final boolean updateSysCounters;
-  private TezThreadDumpHelper tezThreadDumpHelper = 
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
   private final boolean ownUmbilical;
@@ -295,7 +295,13 @@ public class TezChild {
             hadoopShim, sharedExecutor);
 
         boolean shouldDie;
-        tezThreadDumpHelper = 
TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
+        final String[] hookClasses = taskConf
+            .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new 
String[0]);
+        final TezTaskAttemptHook[] hooks = new 
TezTaskAttemptHook[hookClasses.length];
+        for (int i = 0; i < hooks.length; i++) {
+          hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
+          hooks[i].start(attemptId, taskConf);
+        }
         try {
           TaskRunner2Result result = taskRunner.run();
           LOG.info("TaskRunner2Result: {}", result);
@@ -314,7 +320,9 @@ public class TezChild {
                 e, "TaskExecutionFailure: " + e.getMessage());
           }
         } finally {
-          tezThreadDumpHelper.stop();
+          for (TezTaskAttemptHook hook : hooks) {
+            hook.stop();
+          }
           FileSystem.closeAllForUGI(childUGI);
         }
       }
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
new file mode 100644
index 000000000..dd41cee9d
--- /dev/null
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezTaskAttemptHook;
+
+/**
+ * A task attempt hook which dumps thread information periodically.
+ */
+public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook {
+  private TezThreadDumpHelper helper;
+
+  @Override
+  public void start(TezTaskAttemptID id, Configuration conf) {
+    helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
+  }
+
+  @Override
+  public void stop() {
+    helper.stop();
+  }
+}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 892629f29..ee717f33c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -18,7 +18,9 @@
 
 package org.apache.tez.test;
 
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_HOOKS;
 import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS;
 import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -59,11 +61,13 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.ThreadDumpDAGHook;
 import org.apache.tez.mapreduce.examples.CartesianProduct;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import 
org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -555,6 +559,8 @@ public class TestTezJobs {
       org.apache.log4j.Logger.getRootLogger().addAppender(appender);
       appender.setName(TEZ_CONTAINER_LOGGER_NAME);
       appender.setContainerLogDir(logPath.toString());
+      newConf.set(TEZ_AM_HOOKS, ThreadDumpDAGHook.class.getName());
+      newConf.set(TEZ_TASK_ATTEMPT_HOOKS, 
ThreadDumpTaskAttemptHook.class.getName());
       newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms");
     }
     sortMergeJoinExample.setConf(newConf);

Reply via email to