This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 1084699dd TEZ-4527: Add generic and pluggable hooks for DAGs and task
attempts (#324) (Shohei Okumiya reviewed by Laszlo Bodor)
1084699dd is described below
commit 1084699ddb1baf6e4a6df97d988ac696d33f41e8
Author: Shohei Okumiya <[email protected]>
AuthorDate: Sun Dec 22 20:10:24 2024 +0900
TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts (#324)
(Shohei Okumiya reviewed by Laszlo Bodor)
---
.../org/apache/tez/dag/api/TezConfiguration.java | 21 +++++++++-
.../org/apache/tez/runtime/hook/TezDAGHook.java | 44 +++++++++++++++++++++
.../tez/runtime/hook/TezTaskAttemptHook.java | 44 +++++++++++++++++++++
.../org/apache/tez/runtime/hook/package-info.java | 22 +++++++++++
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 23 ++++++++---
.../org/apache/tez/dag/app/ThreadDumpDAGHook.java | 41 ++++++++++++++++++++
.../apache/tez/runtime/TezThreadDumpHelper.java | 45 +++++++---------------
.../java/org/apache/tez/runtime/task/TezChild.java | 16 ++++++--
.../runtime/task/ThreadDumpTaskAttemptHook.java | 41 ++++++++++++++++++++
.../test/java/org/apache/tez/test/TestTezJobs.java | 6 +++
10 files changed, 260 insertions(+), 43 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7e8685363..8862f4b7d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -2297,12 +2297,14 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES =
"tez.mrreader.config.update.properties";
/**
- * Frequency at which thread dump should be captured. Supports TimeUnits.
+ * Frequency at which thread dump should be captured. Supports TimeUnits.
This is effective only
+ * when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to
tez.am.hooks or
+ * org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to
tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL =
"tez.thread.dump.interval";
- public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
+ public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";
/**
* Limits the amount of data that can be written to LocalFileSystem by a
Task.
@@ -2312,4 +2314,19 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES =
"tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;
+ /**
+ * Comma-separated list of hook classes implementing
org.apache.tez.runtime.hook.TezDAGHook.
+ * e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty
+ public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";
+
+ /**
+ * Comma-separated list of hook classes implementing
org.apache.tez.runtime.hook.TezTaskAttemptHook.
+ * e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
+ */
+ @ConfigurationScope(Scope.DAG)
+ @ConfigurationProperty
+ public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX +
"attempt.hooks";
}
diff --git
a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java
new file mode 100644
index 000000000..7fb015bdb
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezDAGID;
+
+/**
+ * A hook which is instantiated and triggered before and after a DAG is
executed.
+ */
[email protected]
[email protected]
+public interface TezDAGHook {
+ /**
+ * Invoked before the DAG starts.
+ *
+ * @param id the DAG id
+ * @param conf the conf
+ */
+ void start(TezDAGID id, Configuration conf);
+
+ /**
+ * Invoked after the DAG finishes.
+ */
+ void stop();
+}
diff --git
a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java
new file mode 100644
index 000000000..54931b64d
--- /dev/null
+++
b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * A hook which is instantiated and triggered before and after a task attempt
is executed.
+ */
[email protected]
[email protected]
+public interface TezTaskAttemptHook {
+ /**
+ * Invoked before the task attempt starts.
+ *
+ * @param id the task attempt id
+ * @param conf the conf
+ */
+ void start(TezTaskAttemptID id, Configuration conf);
+
+ /**
+ * Invoked after the task attempt finishes.
+ */
+ void stop();
+}
diff --git
a/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java
b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java
new file mode 100644
index 000000000..d977897d8
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@Private
+package org.apache.tez.runtime.hook;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
\ No newline at end of file
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9c7cc18b6..4172a5a36 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
@@ -187,7 +188,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
-import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezDAGHook;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
@@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
- private TezThreadDumpHelper tezThreadDumpHelper =
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
+ private TezDAGHook[] hooks = {};
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
@@ -770,7 +771,9 @@ public class DAGAppMaster extends AbstractService {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
- tezThreadDumpHelper.stop();
+ for (TezDAGHook hook : hooks) {
+ hook.stop();
+ }
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss").format(Calendar.getInstance().getTime());
@@ -2226,8 +2229,10 @@ public class DAGAppMaster extends AbstractService {
execService.shutdownNow();
}
- // Check if the thread dump service is up in any case, if yes attempt a
shutdown
- tezThreadDumpHelper.stop();
+ // Try to shut down any hooks that are still active
+ for (TezDAGHook hook : hooks) {
+ hook.stop();
+ }
super.serviceStop();
}
@@ -2599,7 +2604,13 @@ public class DAGAppMaster extends AbstractService {
private void startDAGExecution(DAG dag, final Map<String, LocalResource>
additionalAmResources)
throws TezException {
currentDAG = dag;
- tezThreadDumpHelper =
TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
+ final Configuration conf = dag.getConf();
+ final String[] hookClasses =
conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
+ hooks = new TezDAGHook[hookClasses.length];
+ for (int i = 0; i < hooks.length; i++) {
+ hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
+ hooks[i].start(dag.getID(), conf);
+ }
// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java
new file mode 100644
index 000000000..ff657e47f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezDAGHook;
+
+/**
+ * A DAG hook which dumps thread information periodically.
+ */
+public class ThreadDumpDAGHook implements TezDAGHook {
+ private TezThreadDumpHelper helper;
+
+ @Override
+ public void start(TezDAGID id, Configuration conf) {
+ helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
+ }
+
+ @Override
+ public void stop() {
+ helper.stop();
+ }
+}
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
index 6f3e9fec1..022186a4b 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
+import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezContainerLogAppender;
import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +47,9 @@ import static
org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_D
public class TezThreadDumpHelper {
- public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER =
new NoopTezThreadDumpHelper();
- private long duration = 0L;
- private Path basePath = null;
- private FileSystem fs = null;
+ private final long duration;
+ private final Path basePath;
+ private final FileSystem fs;
private static final ThreadMXBean THREAD_BEAN =
ManagementFactory.getThreadMXBean();
private static final Logger LOG =
LoggerFactory.getLogger(TezThreadDumpHelper.class);
@@ -70,21 +71,17 @@ public class TezThreadDumpHelper {
"path: {}", duration, basePath);
}
- public TezThreadDumpHelper() {
- }
-
public static TezThreadDumpHelper getInstance(Configuration conf) {
- long periodicThreadDumpFrequency =
- conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL,
TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-
- if (periodicThreadDumpFrequency > 0) {
- try {
- return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
- } catch (IOException e) {
- LOG.warn("Can not initialize periodic thread dump service", e);
- }
+ long periodicThreadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL,
+ TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+ Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be
positive duration",
+ TEZ_THREAD_DUMP_INTERVAL);
+
+ try {
+ return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException("Can not initialize periodic thread dump
service", e);
}
- return NOOP_TEZ_THREAD_DUMP_HELPER;
}
public TezThreadDumpHelper start(String name) {
@@ -178,18 +175,4 @@ public class TezThreadDumpHelper {
return id + " (" + taskName + ")";
}
}
-
- private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper {
-
- @Override
- public TezThreadDumpHelper start(String name) {
- // Do Nothing
- return this;
- }
-
- @Override
- public void stop() {
- // Do Nothing
- }
- }
}
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 98b07100a..ed14bd880 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,6 +52,7 @@ import
org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
@@ -69,10 +70,10 @@ import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
-import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
+import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.util.LoggingUtils;
@@ -120,7 +121,6 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
- private TezThreadDumpHelper tezThreadDumpHelper =
TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
@@ -295,7 +295,13 @@ public class TezChild {
hadoopShim, sharedExecutor);
boolean shouldDie;
- tezThreadDumpHelper =
TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
+ final String[] hookClasses = taskConf
+ .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new
String[0]);
+ final TezTaskAttemptHook[] hooks = new
TezTaskAttemptHook[hookClasses.length];
+ for (int i = 0; i < hooks.length; i++) {
+ hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
+ hooks[i].start(attemptId, taskConf);
+ }
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
@@ -314,7 +320,9 @@ public class TezChild {
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
- tezThreadDumpHelper.stop();
+ for (TezTaskAttemptHook hook : hooks) {
+ hook.stop();
+ }
FileSystem.closeAllForUGI(childUGI);
}
}
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
new file mode 100644
index 000000000..dd41cee9d
--- /dev/null
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.TezThreadDumpHelper;
+import org.apache.tez.runtime.hook.TezTaskAttemptHook;
+
+/**
+ * A task attempt hook which dumps thread information periodically.
+ */
+public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook {
+ private TezThreadDumpHelper helper;
+
+ @Override
+ public void start(TezTaskAttemptID id, Configuration conf) {
+ helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
+ }
+
+ @Override
+ public void stop() {
+ helper.stop();
+ }
+}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 892629f29..ee717f33c 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -18,7 +18,9 @@
package org.apache.tez.test;
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_HOOKS;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
+import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS;
import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -59,11 +61,13 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.ThreadDumpDAGHook;
import org.apache.tez.mapreduce.examples.CartesianProduct;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import
org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -555,6 +559,8 @@ public class TestTezJobs {
org.apache.log4j.Logger.getRootLogger().addAppender(appender);
appender.setName(TEZ_CONTAINER_LOGGER_NAME);
appender.setContainerLogDir(logPath.toString());
+ newConf.set(TEZ_AM_HOOKS, ThreadDumpDAGHook.class.getName());
+ newConf.set(TEZ_TASK_ATTEMPT_HOOKS,
ThreadDumpTaskAttemptHook.class.getName());
newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms");
}
sortMergeJoinExample.setConf(newConf);