This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8cae307 [GOBBLIN-791] Fix hanging stream on error in asynchronous
execution model
8cae307 is described below
commit 8cae30790c0a6a71a61d8b83424d733cdffb881b
Author: Hung Tran <[email protected]>
AuthorDate: Mon Jun 3 07:09:27 2019 -0700
[GOBBLIN-791] Fix hanging stream on error in asynchronous execution model
Closes #2659 from htran1/reactivex_stop_extractor
---
.../gobblin/runtime/StreamModelTaskRunner.java | 9 +-
.../java/org/apache/gobblin/runtime/fork/Fork.java | 6 +-
.../apache/gobblin/runtime/TaskContinuousTest.java | 96 ++++++++++++++++++++--
.../apache/gobblin/runtime/TestRecordStream.java | 4 +-
4 files changed, 107 insertions(+), 8 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index f490997..66c8f10 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
import org.apache.gobblin.writer.WatermarkManager;
import org.apache.gobblin.writer.WatermarkStorage;
+import io.reactivex.Flowable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
import lombok.AllArgsConstructor;
@@ -80,8 +81,14 @@ public class StreamModelTaskRunner {
ForkOperator forkOperator =
closer.register(this.taskContext.getForkOperator());
RecordStreamWithMetadata<?, ?> stream =
this.extractor.recordStream(this.shutdownRequested);
+ // This prevents emitting records until a connect() call is made on the
connectable stream
ConnectableFlowable connectableStream = stream.getRecordStream().publish();
- stream = stream.withRecordStream(connectableStream);
+
+ // The cancel is not propagated to the extractor's record generator when
it has been turned into a hot Flowable
+ // by publish, so set the shutdownRequested flag on cancel to stop the
extractor
+ Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() ->
this.shutdownRequested.set(true));
+
+ stream = stream.withRecordStream(streamWithShutdownOnCancel);
stream = stream.mapRecords(r -> {
this.task.onRecordExtract();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index bb5183f..c165825 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -207,7 +207,11 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
}));
stream = stream.mapStream(s -> s.doOnSubscribe(subscription -> onStart()));
stream = stream.mapStream(s -> s.doOnComplete(() ->
verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
- stream = stream.mapStream(s -> s.doOnCancel(() ->
verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
+ stream = stream.mapStream(s -> s.doOnCancel(() -> {
+ // Errors don't propagate up from below the fork, but cancel the stream,
so use the failed state to indicate that
+ // the fork failed to complete, which will then fail the task.
+ verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
+ }));
stream = stream.mapStream(s -> s.doOnError(exc -> {
verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
this.logger.error(String.format("Fork %d of task %s failed to process
data records", this.index, this.taskId), exc);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index 97da4df..046df9b 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -267,7 +267,8 @@ public class TaskContinuousTest {
OneRecordExtractor oneRecordExtractor = new
OneRecordExtractor(testRecord);
- TaskContext mockTaskContext = getMockTaskContext(recordCollector,
oneRecordExtractor, taskExecutionSync);
+ TaskContext mockTaskContext = getMockTaskContext(recordCollector,
oneRecordExtractor, taskExecutionSync,
+ Integer.MAX_VALUE);
// Create a mock TaskPublisher
TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
@@ -336,7 +337,7 @@ public class TaskContinuousTest {
}
private TaskContext getMockTaskContext(ArrayList<Object> recordCollector,
- Extractor mockExtractor, Boolean taskExecutionSync)
+ Extractor mockExtractor, Boolean taskExecutionSync, int errorAtCount)
throws Exception {
TaskState taskState = getStreamingTaskState(taskExecutionSync);
@@ -364,7 +365,8 @@ public class TaskContinuousTest {
when(mockTaskContext.getRowLevelPolicyChecker()).thenReturn(mockRowLevelPolicyChecker);
when(mockTaskContext.getRowLevelPolicyChecker(anyInt())).thenReturn(mockRowLevelPolicyChecker);
when(mockTaskContext.getTaskLevelPolicyChecker(any(TaskState.class),
anyInt())).thenReturn(mock(TaskLevelPolicyChecker.class));
- when(mockTaskContext.getDataWriterBuilder(anyInt(),
anyInt())).thenReturn(new TestStreamingDataWriterBuilder(recordCollector));
+ when(mockTaskContext.getDataWriterBuilder(anyInt(),
anyInt())).thenReturn(new TestStreamingDataWriterBuilder(recordCollector,
+ errorAtCount));
return mockTaskContext;
}
@@ -399,7 +401,8 @@ public class TaskContinuousTest {
ContinuousExtractor continuousExtractor = new
ContinuousExtractor(perRecordExtractLatencyMillis);
- TaskContext mockTaskContext = getMockTaskContext(recordCollector,
continuousExtractor, taskExecutionSync);
+ TaskContext mockTaskContext = getMockTaskContext(recordCollector,
continuousExtractor, taskExecutionSync,
+ Integer.MAX_VALUE);
// Create a mock TaskStateTracker
TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
@@ -454,12 +457,89 @@ public class TaskContinuousTest {
}
+ /**
+ * Test that a streaming task will work correctly when an extractor is
continuously producing records and encounters
+ * an error in the writer.
+ *
+ * The task should exit in a failed state.
+ *
+ * No converters
+ * Identity fork
+ * One writer
+ * @throws Exception
+ */
+ @Test
+ public void testContinuousTaskError()
+ throws Exception {
+
+ for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
+ ArrayList<Object> recordCollector = new ArrayList<>(100);
+ long perRecordExtractLatencyMillis = 1000; // 1 second per record
+
+ ContinuousExtractor continuousExtractor = new
ContinuousExtractor(perRecordExtractLatencyMillis);
+
+ TaskContext mockTaskContext = getMockTaskContext(recordCollector,
continuousExtractor, taskExecutionSync, 5);
+
+ // Create a mock TaskStateTracker
+ TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+ // Create a TaskExecutor - a real TaskExecutor must be created so a Fork
is run in a separate thread
+ TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+ // Create the Task
+ Task task = new Task(mockTaskContext, mockTaskStateTracker,
taskExecutor, Optional.<CountDownLatch>absent());
+
+ ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1,
ExecutorsUtils.newThreadFactory(Optional.of(log)));
+
+ taskRunner.execute(task);
+
+ // Let the task run for 10 seconds
+ int sleepIterations = 10;
+ int currentIteration = 0;
+
+ while (currentIteration < sleepIterations) {
+ Thread.sleep(1000);
+ currentIteration++;
+ Map<String, CheckpointableWatermark> externalWatermarkStorage =
mockTaskContext.getWatermarkStorage()
+ .getCommittedWatermarks(CheckpointableWatermark.class,
ImmutableList.of("default"));
+ if (!externalWatermarkStorage.isEmpty()) {
+ for (CheckpointableWatermark watermark :
externalWatermarkStorage.values()) {
+ log.info("Observed committed watermark: {}", watermark);
+ }
+ log.info("Task progress: {}", task.getProgress());
+ // Ensure that watermarks seem reasonable at each step
+ Assert.assertTrue(continuousExtractor.validateWatermarks(false,
externalWatermarkStorage));
+ }
+ }
+
+ boolean success = task.awaitShutdown(30000);
+ Assert.assertTrue(success, "Task should shutdown in 30 seconds");
+ log.info("Task done waiting to shutdown {}", success);
+
+ // Shutdown on error, so don't check for the exact watermark
+ Assert.assertTrue(continuousExtractor.validateWatermarks(false,
mockTaskContext.getWatermarkStorage()
+ .getCommittedWatermarks(CheckpointableWatermark.class,
ImmutableList.of("default"))));
+
+ task.commit();
+
+ // Task should be in failed state from extractor error
+ Assert.assertEquals(mockTaskContext.getTaskState().getWorkingState(),
WorkUnitState.WorkingState.FAILED);
+ // Shutdown the executor
+ taskRunner.shutdown();
+ taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+
private class TestStreamingDataWriterBuilder extends
org.apache.gobblin.writer.DataWriterBuilder {
private final List<Object> _recordCollector;
+ private final int _errorAtCount;
+ private int _recordCount = 0;
- TestStreamingDataWriterBuilder(List<Object> recordCollector) {
+ TestStreamingDataWriterBuilder(List<Object> recordCollector, int
errorAtCount) {
_recordCollector = recordCollector;
+ _errorAtCount = errorAtCount;
}
@Override
@@ -478,6 +558,11 @@ public class TaskContinuousTest {
@Override
public void writeEnvelope(RecordEnvelope<Object> recordEnvelope)
throws IOException {
+ _recordCount++;
+
+ if (_recordCount >= _errorAtCount) {
+ throw new IOException("Errored after record count " +
_errorAtCount);
+ }
_recordCollector.add(recordEnvelope.getRecord());
String source = recordEnvelope.getWatermark().getSource();
if (this.source.get() != null) {
@@ -488,6 +573,7 @@ public class TaskContinuousTest {
this.lastWatermark.set(recordEnvelope.getWatermark());
recordEnvelope.ack();
this.source.set(source);
+
}
@Override
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index d48349a..abb8f8e 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.records.RecordStreamProcessor;
import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -142,7 +143,7 @@ public class TestRecordStream {
Assert.assertNotNull(error);
task.commit();
- Assert.assertEquals(task.getTaskState().getWorkingState(),
WorkUnitState.WorkingState.SUCCESSFUL);
+ Assert.assertEquals(task.getTaskState().getWorkingState(),
WorkUnitState.WorkingState.FAILED);
Assert.assertEquals(converter.records, Lists.newArrayList("a", "b"));
Assert.assertEquals(converter.messages, Lists.newArrayList(
@@ -335,6 +336,7 @@ public class TestRecordStream {
when(mockTaskContext.getRowLevelPolicyChecker(anyInt())).
thenReturn(new RowLevelPolicyChecker(Lists.newArrayList(), "ss",
FileSystem.getLocal(new Configuration())));
when(mockTaskContext.getDataWriterBuilder(anyInt(),
anyInt())).thenReturn(writer);
+
when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
// Create a mock TaskPublisher
TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);