This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a77c9f5  [GOBBLIN-685] Add dump jstack for EmbeddedGobblin
a77c9f5 is described below

commit a77c9f5adf70163dbdb7d63db708deadae9ba4af
Author: Kuai Yu <[email protected]>
AuthorDate: Fri Feb 15 16:59:50 2019 -0800

    [GOBBLIN-685] Add dump jstack for EmbeddedGobblin
    
    Closes #2557 from yukuai518/jstack
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 +-
 .../gobblin/runtime/embedded/EmbeddedGobblin.java  | 37 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 5b008d4..f0c2891 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -184,7 +184,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
       this.eventBus.register(this.jobContext);
 
       this.cancellationExecutor = Executors.newSingleThreadExecutor(
-          ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("CancellationExecutor")));
+          ExecutorsUtils.newDaemonThreadFactory(Optional.of(LOG), 
Optional.of("CancellationExecutor")));
 
       this.runtimeMetricContext =
           this.jobContext.getJobMetricsOptional().transform(new 
Function<JobMetrics, MetricContext>() {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index a78b149..b6cc3b5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -18,12 +18,17 @@
 package org.apache.gobblin.runtime.embedded;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -139,6 +144,7 @@ public class EmbeddedGobblin {
   private FullTimeout launchTimeout = new FullTimeout(10, TimeUnit.SECONDS);
   private FullTimeout jobTimeout = new FullTimeout(10, TimeUnit.DAYS);
   private FullTimeout shutdownTimeout = new FullTimeout(10, TimeUnit.SECONDS);
+  private boolean dumpJStackOnTimeout = false;
   private List<GobblinInstancePluginFactory> plugins = Lists.newArrayList();
   private Optional<Path> jobFile = Optional.absent();
 
@@ -353,6 +359,14 @@ public class EmbeddedGobblin {
   }
 
   /**
+   * Enable dumping jstack when error happens.
+   */
+  public EmbeddedGobblin setDumpJStackOnTimeout(boolean dumpJStackOnTimeout) {
+    this.dumpJStackOnTimeout = dumpJStackOnTimeout;
+    return this;
+  }
+
+  /**
    * Enable state store.
    */
   public EmbeddedGobblin useStateStore(String rootDir) {
@@ -458,6 +472,7 @@ public class EmbeddedGobblin {
 
     boolean started = listener.awaitStarted(this.launchTimeout.getTimeout(), 
this.launchTimeout.getTimeUnit());
     if (!started) {
+      dumpJStackOnTimeout("Launch");
       log.warn("Timeout waiting for job to start. Aborting.");
       driver.stopAsync();
       driver.awaitTerminated(this.shutdownTimeout.getTimeout(), 
this.shutdownTimeout.getTimeUnit());
@@ -484,6 +499,7 @@ public class EmbeddedGobblin {
           
driver.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(), 
EmbeddedGobblin.this.shutdownTimeout
               .getTimeUnit());
         } catch (TimeoutException te) {
+          dumpJStackOnTimeout("stop gobblin instance driver");
           log.error("Failed to shutdown Gobblin instance driver.");
         }
       }
@@ -492,6 +508,27 @@ public class EmbeddedGobblin {
     return listener.getJobDriver();
   }
 
+  private void dumpJStackOnTimeout(String loc) {
+    if (this.dumpJStackOnTimeout) {
+      log.info("=== Dump jstack ({}) ===", loc);
+      ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+      ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+      Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+      Map<Long, Thread> threadMap = new HashMap<>();
+      for (Thread t : threadSet) {
+        threadMap.put(t.getId(), t);
+      }
+
+      for (ThreadInfo info : infos) {
+        Thread thread = threadMap.get(info.getThreadId());
+        log.info("({}) {}",
+            thread == null ? "Unknown" : thread.isDaemon() ? "Daemon" : 
"Non-Daemon", info.toString());
+      }
+    } else {
+      log.info("Dump jstack ({}) is disabled.", loc);
+    }
+  }
+
   private Configurable getSysConfig() {
     return 
DefaultConfigurableImpl.createFromConfig(ConfigFactory.parseMap(this.sysConfigOverrides).withFallback(this.defaultSysConfig));
   }

Reply via email to