This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a77c9f5 [GOBBLIN-685] Add dump jstack for EmbeddedGobblin
a77c9f5 is described below
commit a77c9f5adf70163dbdb7d63db708deadae9ba4af
Author: Kuai Yu <[email protected]>
AuthorDate: Fri Feb 15 16:59:50 2019 -0800
[GOBBLIN-685] Add dump jstack for EmbeddedGobblin
Closes #2557 from yukuai518/jstack
---
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
.../gobblin/runtime/embedded/EmbeddedGobblin.java | 37 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 5b008d4..f0c2891 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -184,7 +184,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.eventBus.register(this.jobContext);
this.cancellationExecutor = Executors.newSingleThreadExecutor(
- ExecutorsUtils.newThreadFactory(Optional.of(LOG),
Optional.of("CancellationExecutor")));
+ ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG),
Optional.of("CancellationExecutor")));
this.runtimeMetricContext =
this.jobContext.getJobMetricsOptional().transform(new
Function<JobMetrics, MetricContext>() {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index a78b149..b6cc3b5 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -18,12 +18,17 @@
package org.apache.gobblin.runtime.embedded;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -139,6 +144,7 @@ public class EmbeddedGobblin {
private FullTimeout launchTimeout = new FullTimeout(10, TimeUnit.SECONDS);
private FullTimeout jobTimeout = new FullTimeout(10, TimeUnit.DAYS);
private FullTimeout shutdownTimeout = new FullTimeout(10, TimeUnit.SECONDS);
+ private boolean dumpJStackOnTimeout = false;
private List<GobblinInstancePluginFactory> plugins = Lists.newArrayList();
private Optional<Path> jobFile = Optional.absent();
@@ -353,6 +359,14 @@ public class EmbeddedGobblin {
}
/**
+ * Enable dumping jstack when error happens.
+ */
+ public EmbeddedGobblin setDumpJStackOnTimeout(boolean dumpJStackOnTimeout) {
+ this.dumpJStackOnTimeout = dumpJStackOnTimeout;
+ return this;
+ }
+
+ /**
* Enable state store.
*/
public EmbeddedGobblin useStateStore(String rootDir) {
@@ -458,6 +472,7 @@ public class EmbeddedGobblin {
boolean started = listener.awaitStarted(this.launchTimeout.getTimeout(),
this.launchTimeout.getTimeUnit());
if (!started) {
+ dumpJStackOnTimeout("Launch");
log.warn("Timeout waiting for job to start. Aborting.");
driver.stopAsync();
driver.awaitTerminated(this.shutdownTimeout.getTimeout(),
this.shutdownTimeout.getTimeUnit());
@@ -484,6 +499,7 @@ public class EmbeddedGobblin {
driver.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(),
EmbeddedGobblin.this.shutdownTimeout
.getTimeUnit());
} catch (TimeoutException te) {
+ dumpJStackOnTimeout("stop gobblin instance driver");
log.error("Failed to shutdown Gobblin instance driver.");
}
}
@@ -492,6 +508,27 @@ public class EmbeddedGobblin {
return listener.getJobDriver();
}
+ private void dumpJStackOnTimeout(String loc) {
+ if (this.dumpJStackOnTimeout) {
+ log.info("=== Dump jstack ({}) ===", loc);
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+ Map<Long, Thread> threadMap = new HashMap<>();
+ for (Thread t : threadSet) {
+ threadMap.put(t.getId(), t);
+ }
+
+ for (ThreadInfo info : infos) {
+ Thread thread = threadMap.get(info.getThreadId());
+ log.info("({}) {}",
+ thread == null ? "Unknown" : thread.isDaemon() ? "Daemon" :
"Non-Daemon", info.toString());
+ }
+ } else {
+ log.info("Dump jstack ({}) is disabled.", loc);
+ }
+ }
+
private Configurable getSysConfig() {
return
DefaultConfigurableImpl.createFromConfig(ConfigFactory.parseMap(this.sysConfigOverrides).withFallback(this.defaultSysConfig));
}