This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 09c6f11 [GOBBLIN-1420] Log uncaught exceptions from
StreamModelTaskRunner[]
09c6f11 is described below
commit 09c6f11620ecbd8ec2fe986f820d97a0c2617e73
Author: suvasude <[email protected]>
AuthorDate: Thu Apr 15 12:38:04 2021 -0700
[GOBBLIN-1420] Log uncaught exceptions from StreamModelTaskRunner[]
Closes #3256 from sv2000/logUncaughtExceptions
---
.../java/org/apache/gobblin/runtime/StreamModelTaskRunner.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index faee814..9e3a7f7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -39,6 +39,7 @@ import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.StreamingExtractor;
import org.apache.gobblin.util.ExponentialBackoff;
+import org.apache.gobblin.util.LoggingUncaughtExceptionHandler;
import org.apache.gobblin.writer.AcknowledgableWatermark;
import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
import org.apache.gobblin.writer.WatermarkManager;
@@ -151,9 +152,11 @@ public class StreamModelTaskRunner {
this.task.configureStreamingFork(fork);
}
}
- new Thread(() -> {
- connectableStream.connect();
- }).start();
+ Thread thread = new Thread(() -> connectableStream.connect());
+ thread.setName(this.getClass().getSimpleName());
+ //Log uncaught exceptions (e.g.OOMEs) to prevent threads from dying
silently
+ thread.setUncaughtExceptionHandler(new
LoggingUncaughtExceptionHandler(Optional.absent()));
+ thread.start();
if (!ExponentialBackoff.awaitCondition().callable(() ->
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
{