This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3a5cd5302 [GOBBLIN-1988] Fail the streaming container when OOM issue
happens (#3862)
3a5cd5302 is described below
commit 3a5cd5302d1b30b782f82e75b5be3bd20f3c5e19
Author: Zihan Li <[email protected]>
AuthorDate: Fri Jan 19 14:37:45 2024 -0800
[GOBBLIN-1988] Fail the streaming container when OOM issue happens (#3862)
* address comments
* use connectionmanager when httpclient is not cloesable
* add uite test
* fix typo
* [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max
poll records during runtime
* small refractor
* [GOBBLIN-1988] Fail the streaming container when OOM issue happens
* improve code style
* add unit test
* fix code style
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/runtime/StreamModelTaskRunner.java | 11 +++++--
.../apache/gobblin/runtime/TestRecordStream.java | 37 ++++++++++++++++++++++
.../util/LoggingUncaughtExceptionHandler.java | 4 +++
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 9e3a7f70c..1ee365d99 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime;
+import com.google.api.client.repackaged.com.google.common.base.Throwables;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -155,13 +156,17 @@ public class StreamModelTaskRunner {
Thread thread = new Thread(() -> connectableStream.connect());
thread.setName(this.getClass().getSimpleName());
//Log uncaught exceptions (e.g.OOMEs) to prevent threads from dying
silently
- thread.setUncaughtExceptionHandler(new
LoggingUncaughtExceptionHandler(Optional.absent()));
+ LoggingUncaughtExceptionHandler exceptionHandler = new
LoggingUncaughtExceptionHandler(Optional.absent());
+ thread.setUncaughtExceptionHandler(exceptionHandler);
thread.start();
-
- if (!ExponentialBackoff.awaitCondition().callable(() ->
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
+ if (!ExponentialBackoff.awaitCondition().callable(() ->
+ this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)
|| exceptionHandler.getException() != null).
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
{
throw new TimeoutException("Forks did not finish withing specified
timeout.");
}
+ if (exceptionHandler.getException() != null) {
+ throw Throwables.propagate(exceptionHandler.getException());
+ }
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index b418493b4..4f3fd6ce4 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -24,6 +24,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@@ -95,6 +97,27 @@ public class TestRecordStream {
Assert.assertEquals(writer.messages, Lists.newArrayList(new
BasicTestControlMessage("1"), new BasicTestControlMessage("2")));
}
+ @Test
+ public void testExceptionsFailTheJob() throws Exception {
+ MyExtractor extractor = new MyExtractor(new StreamEntity[]{new
RecordEnvelope<>("a"),
+ FlushControlMessage.builder().flushReason("flush1").build(), new
RecordEnvelope<>("b"),
+ FlushControlMessage.builder().flushReason("flush2").build()});
+ MyConverter converter = new MyConverter();
+ MyFlushDataWriterWithException writer = new
MyFlushDataWriterWithException();
+
+ Task task = setupTask(extractor, writer, converter);
+ //Single fork which is the same as streaming pipeline
+
task.getTaskState().setProp(TaskConfigurationKeys.TASK_IS_SINGLE_BRANCH_SYNCHRONOUS,
true);
+
+ //It should fail the task directly without hanging therer for a long time
+ Executors.newSingleThreadExecutor().submit(() -> {
+ task.run();
+ task.commit();
+ }).get(60, TimeUnit.SECONDS);
+ Assert.assertEquals(task.getTaskState().getWorkingState(),
WorkUnitState.WorkingState.FAILED);
+ }
+
+
@Test
public void testFlushControlMessages() throws Exception {
MyExtractor extractor = new MyExtractor(new StreamEntity[]{new
RecordEnvelope<>("a"),
@@ -596,6 +619,20 @@ public class TestRecordStream {
}
}
+ static class MyFlushDataWriterWithException extends MyDataWriter {
+ private List<String> flush_messages = new ArrayList<>();
+
+ @Override
+ public ControlMessageHandler getMessageHandler() {
+ return new MyFlushControlMessageHandler(this);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ throw new OutOfMemoryError("test Error");
+ }
+ }
+
static class MyDataWriterWithSchemaCheck extends MyDataWriter {
@Override
public void write(String record) throws IOException {
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
index be6118d3c..41b56990f 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/LoggingUncaughtExceptionHandler.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.util;
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +32,8 @@ import com.google.common.base.Optional;
public class LoggingUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
private final Logger logger;
+ @Getter
+ private Throwable exception = null;
public LoggingUncaughtExceptionHandler(Optional<Logger> logger) {
if (logger.isPresent()) {
@@ -42,6 +45,7 @@ public class LoggingUncaughtExceptionHandler implements
Thread.UncaughtException
@Override
public void uncaughtException(Thread t, Throwable e) {
+ exception = e;
this.logger.error(String.format("Thread %s threw an uncaught exception:
%s", t, e), e);
}
}