This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a5cd5302 [GOBBLIN-1988] Fail the streaming container when OOM issue 
happens (#3862)
3a5cd5302 is described below

commit 3a5cd5302d1b30b782f82e75b5be3bd20f3c5e19
Author: Zihan Li <[email protected]>
AuthorDate: Fri Jan 19 14:37:45 2024 -0800

    [GOBBLIN-1988] Fail the streaming container when OOM issue happens (#3862)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * add uite test
    
    * fix typo
    
    * [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max 
poll records during runtime
    
    * small refractor
    
    * [GOBBLIN-1988] Fail the streaming container when OOM issue happens
    
    * improve code style
    
    * add unit test
    
    * fix code style
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/runtime/StreamModelTaskRunner.java     | 11 +++++--
 .../apache/gobblin/runtime/TestRecordStream.java   | 37 ++++++++++++++++++++++
 .../util/LoggingUncaughtExceptionHandler.java      |  4 +++
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 9e3a7f70c..1ee365d99 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -155,13 +156,17 @@ public class StreamModelTaskRunner {
     Thread thread = new Thread(() -> connectableStream.connect());
     thread.setName(this.getClass().getSimpleName());
     //Log uncaught exceptions (e.g.OOMEs) to prevent threads from dying 
silently
-    thread.setUncaughtExceptionHandler(new 
LoggingUncaughtExceptionHandler(Optional.absent()));
+    LoggingUncaughtExceptionHandler exceptionHandler = new 
LoggingUncaughtExceptionHandler(Optional.absent());
+    thread.setUncaughtExceptionHandler(exceptionHandler);
     thread.start();
-
-    if (!ExponentialBackoff.awaitCondition().callable(() -> 
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
+    if (!ExponentialBackoff.awaitCondition().callable(() ->
+        this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone) 
|| exceptionHandler.getException() != null).
         
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
 {
       throw new TimeoutException("Forks did not finish withing specified 
timeout.");
     }
+    if (exceptionHandler.getException() != null) {
+      throw Throwables.propagate(exceptionHandler.getException());
+    }
   }
 
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index b418493b4..4f3fd6ce4 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -95,6 +97,27 @@ public class TestRecordStream {
     Assert.assertEquals(writer.messages, Lists.newArrayList(new 
BasicTestControlMessage("1"), new BasicTestControlMessage("2")));
   }
 
+  @Test
+  public void testExceptionsFailTheJob() throws Exception {
+    MyExtractor extractor = new MyExtractor(new StreamEntity[]{new 
RecordEnvelope<>("a"),
+        FlushControlMessage.builder().flushReason("flush1").build(), new 
RecordEnvelope<>("b"),
+        FlushControlMessage.builder().flushReason("flush2").build()});
+    MyConverter converter = new MyConverter();
+    MyFlushDataWriterWithException writer = new 
MyFlushDataWriterWithException();
+
+    Task task = setupTask(extractor, writer, converter);
+    //Single fork which is the same as streaming pipeline
+    
task.getTaskState().setProp(TaskConfigurationKeys.TASK_IS_SINGLE_BRANCH_SYNCHRONOUS,
 true);
+
+    //It should fail the task directly without hanging therer for a long time
+    Executors.newSingleThreadExecutor().submit(() -> {
+      task.run();
+      task.commit();
+    }).get(60, TimeUnit.SECONDS);
+    Assert.assertEquals(task.getTaskState().getWorkingState(), 
WorkUnitState.WorkingState.FAILED);
+  }
+
+
   @Test
   public void testFlushControlMessages() throws Exception {
     MyExtractor extractor = new MyExtractor(new StreamEntity[]{new 
RecordEnvelope<>("a"),
@@ -596,6 +619,20 @@ public class TestRecordStream {
     }
   }
 
+  static class MyFlushDataWriterWithException extends MyDataWriter {
+    private List<String> flush_messages = new ArrayList<>();
+
+    @Override
+    public ControlMessageHandler getMessageHandler() {
+      return new MyFlushControlMessageHandler(this);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      throw new OutOfMemoryError("test Error");
+    }
+  }
+
   static class MyDataWriterWithSchemaCheck extends MyDataWriter {
     @Override
     public void write(String record) throws IOException {
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
index be6118d3c..41b56990f 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.util;
 
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +32,8 @@ import com.google.common.base.Optional;
 public class LoggingUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
 
   private final Logger logger;
+  @Getter
+  private Throwable exception = null;
 
   public LoggingUncaughtExceptionHandler(Optional<Logger> logger) {
     if (logger.isPresent()) {
@@ -42,6 +45,7 @@ public class LoggingUncaughtExceptionHandler implements 
Thread.UncaughtException
 
   @Override
   public void uncaughtException(Thread t, Throwable e) {
+    exception = e;
     this.logger.error(String.format("Thread %s threw an uncaught exception: 
%s", t, e), e);
   }
 }

Reply via email to