This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 09c6f11  [GOBBLIN-1420] Log uncaught exceptions from 
StreamModelTaskRunner[]
09c6f11 is described below

commit 09c6f11620ecbd8ec2fe986f820d97a0c2617e73
Author: suvasude <[email protected]>
AuthorDate: Thu Apr 15 12:38:04 2021 -0700

    [GOBBLIN-1420] Log uncaught exceptions from StreamModelTaskRunner[]
    
    Closes #3256 from sv2000/logUncaughtExceptions
---
 .../java/org/apache/gobblin/runtime/StreamModelTaskRunner.java   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index faee814..9e3a7f7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -39,6 +39,7 @@ import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.LoggingUncaughtExceptionHandler;
 import org.apache.gobblin.writer.AcknowledgableWatermark;
 import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
 import org.apache.gobblin.writer.WatermarkManager;
@@ -151,9 +152,11 @@ public class StreamModelTaskRunner {
         this.task.configureStreamingFork(fork);
       }
     }
-    new Thread(() -> {
-      connectableStream.connect();
-    }).start();
+    Thread thread = new Thread(() -> connectableStream.connect());
+    thread.setName(this.getClass().getSimpleName());
+    //Log uncaught exceptions (e.g.OOMEs) to prevent threads from dying 
silently
+    thread.setUncaughtExceptionHandler(new 
LoggingUncaughtExceptionHandler(Optional.absent()));
+    thread.start();
 
     if (!ExponentialBackoff.awaitCondition().callable(() -> 
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
         
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
 {

Reply via email to