This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cae307  [GOBBLIN-791] Fix hanging stream on error in asynchronous 
execution model
8cae307 is described below

commit 8cae30790c0a6a71a61d8b83424d733cdffb881b
Author: Hung Tran <[email protected]>
AuthorDate: Mon Jun 3 07:09:27 2019 -0700

    [GOBBLIN-791] Fix hanging stream on error in asynchronous execution model
    
    Closes #2659 from htran1/reactivex_stop_extractor
---
 .../gobblin/runtime/StreamModelTaskRunner.java     |  9 +-
 .../java/org/apache/gobblin/runtime/fork/Fork.java |  6 +-
 .../apache/gobblin/runtime/TaskContinuousTest.java | 96 ++++++++++++++++++++--
 .../apache/gobblin/runtime/TestRecordStream.java   |  4 +-
 4 files changed, 107 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index f490997..66c8f10 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
 import org.apache.gobblin.writer.WatermarkManager;
 import org.apache.gobblin.writer.WatermarkStorage;
 
+import io.reactivex.Flowable;
 import io.reactivex.flowables.ConnectableFlowable;
 import io.reactivex.schedulers.Schedulers;
 import lombok.AllArgsConstructor;
@@ -80,8 +81,14 @@ public class StreamModelTaskRunner {
     ForkOperator forkOperator = 
closer.register(this.taskContext.getForkOperator());
 
     RecordStreamWithMetadata<?, ?> stream = 
this.extractor.recordStream(this.shutdownRequested);
+    // This prevents emitting records until a connect() call is made on the 
connectable stream
     ConnectableFlowable connectableStream = stream.getRecordStream().publish();
-    stream = stream.withRecordStream(connectableStream);
+
+    // The cancel is not propagated to the extractor's record generator when 
it has been turned into a hot Flowable
+    // by publish, so set the shutdownRequested flag on cancel to stop the 
extractor
+    Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() -> 
this.shutdownRequested.set(true));
+
+    stream = stream.withRecordStream(streamWithShutdownOnCancel);
 
     stream = stream.mapRecords(r -> {
       this.task.onRecordExtract();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index bb5183f..c165825 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -207,7 +207,11 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
     }));
     stream = stream.mapStream(s -> s.doOnSubscribe(subscription -> onStart()));
     stream = stream.mapStream(s -> s.doOnComplete(() -> 
verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
-    stream = stream.mapStream(s -> s.doOnCancel(() -> 
verifyAndSetForkState(ForkState.RUNNING, ForkState.SUCCEEDED)));
+    stream = stream.mapStream(s -> s.doOnCancel(() -> {
+      // Errors don't propagate up from below the fork, but cancel the stream, 
so use the failed state to indicate that
+      // the fork failed to complete, which will then fail the task.
+      verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
+    }));
     stream = stream.mapStream(s -> s.doOnError(exc -> {
       verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
       this.logger.error(String.format("Fork %d of task %s failed to process 
data records", this.index, this.taskId), exc);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index 97da4df..046df9b 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -267,7 +267,8 @@ public class TaskContinuousTest {
 
       OneRecordExtractor oneRecordExtractor = new 
OneRecordExtractor(testRecord);
 
-      TaskContext mockTaskContext = getMockTaskContext(recordCollector, 
oneRecordExtractor, taskExecutionSync);
+      TaskContext mockTaskContext = getMockTaskContext(recordCollector, 
oneRecordExtractor, taskExecutionSync,
+          Integer.MAX_VALUE);
 
       // Create a mock TaskPublisher
       TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
@@ -336,7 +337,7 @@ public class TaskContinuousTest {
   }
 
   private TaskContext getMockTaskContext(ArrayList<Object> recordCollector,
-      Extractor mockExtractor, Boolean taskExecutionSync)
+      Extractor mockExtractor, Boolean taskExecutionSync, int errorAtCount)
       throws Exception {
 
     TaskState taskState = getStreamingTaskState(taskExecutionSync);
@@ -364,7 +365,8 @@ public class TaskContinuousTest {
     
when(mockTaskContext.getRowLevelPolicyChecker()).thenReturn(mockRowLevelPolicyChecker);
     
when(mockTaskContext.getRowLevelPolicyChecker(anyInt())).thenReturn(mockRowLevelPolicyChecker);
     when(mockTaskContext.getTaskLevelPolicyChecker(any(TaskState.class), 
anyInt())).thenReturn(mock(TaskLevelPolicyChecker.class));
-    when(mockTaskContext.getDataWriterBuilder(anyInt(), 
anyInt())).thenReturn(new TestStreamingDataWriterBuilder(recordCollector));
+    when(mockTaskContext.getDataWriterBuilder(anyInt(), 
anyInt())).thenReturn(new TestStreamingDataWriterBuilder(recordCollector,
+        errorAtCount));
     return mockTaskContext;
   }
 
@@ -399,7 +401,8 @@ public class TaskContinuousTest {
 
       ContinuousExtractor continuousExtractor = new 
ContinuousExtractor(perRecordExtractLatencyMillis);
 
-      TaskContext mockTaskContext = getMockTaskContext(recordCollector, 
continuousExtractor, taskExecutionSync);
+      TaskContext mockTaskContext = getMockTaskContext(recordCollector, 
continuousExtractor, taskExecutionSync,
+          Integer.MAX_VALUE);
 
       // Create a mock TaskStateTracker
       TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
@@ -454,12 +457,89 @@ public class TaskContinuousTest {
 
   }
 
+  /**
+   * Test that a streaming task will work correctly when an extractor is 
continuously producing records and encounters
+   * an error in the writer.
+   *
+   * The task should exit in a failed state.
+   *
+   * No converters
+   * Identity fork
+   * One writer
+   * @throws Exception
+   */
+  @Test
+  public void testContinuousTaskError()
+      throws Exception {
+
+    for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
+      ArrayList<Object> recordCollector = new ArrayList<>(100);
+      long perRecordExtractLatencyMillis = 1000; // 1 second per record
+
+      ContinuousExtractor continuousExtractor = new 
ContinuousExtractor(perRecordExtractLatencyMillis);
+
+      TaskContext mockTaskContext = getMockTaskContext(recordCollector, 
continuousExtractor, taskExecutionSync, 5);
+
+      // Create a mock TaskStateTracker
+      TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+
+      // Create a TaskExecutor - a real TaskExecutor must be created so a Fork 
is run in a separate thread
+      TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+
+      // Create the Task
+      Task task = new Task(mockTaskContext, mockTaskStateTracker, 
taskExecutor, Optional.<CountDownLatch>absent());
+
+      ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, 
ExecutorsUtils.newThreadFactory(Optional.of(log)));
+
+      taskRunner.execute(task);
+
+      // Let the task run for 10 seconds
+      int sleepIterations = 10;
+      int currentIteration = 0;
+
+      while (currentIteration < sleepIterations) {
+        Thread.sleep(1000);
+        currentIteration++;
+        Map<String, CheckpointableWatermark> externalWatermarkStorage = 
mockTaskContext.getWatermarkStorage()
+            .getCommittedWatermarks(CheckpointableWatermark.class, 
ImmutableList.of("default"));
+        if (!externalWatermarkStorage.isEmpty()) {
+          for (CheckpointableWatermark watermark : 
externalWatermarkStorage.values()) {
+            log.info("Observed committed watermark: {}", watermark);
+          }
+          log.info("Task progress: {}", task.getProgress());
+          // Ensure that watermarks seem reasonable at each step
+          Assert.assertTrue(continuousExtractor.validateWatermarks(false, 
externalWatermarkStorage));
+        }
+      }
+
+      boolean success = task.awaitShutdown(30000);
+      Assert.assertTrue(success, "Task should shutdown in 30 seconds");
+      log.info("Task done waiting to shutdown {}", success);
+
+      // Shutdown on error, so don't check for the exact watermark
+      Assert.assertTrue(continuousExtractor.validateWatermarks(false, 
mockTaskContext.getWatermarkStorage()
+          .getCommittedWatermarks(CheckpointableWatermark.class, 
ImmutableList.of("default"))));
+
+      task.commit();
+
+      // Task should be in failed state from extractor error
+      Assert.assertEquals(mockTaskContext.getTaskState().getWorkingState(), 
WorkUnitState.WorkingState.FAILED);
+      // Shutdown the executor
+      taskRunner.shutdown();
+      taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+    }
+  }
+
+
   private class TestStreamingDataWriterBuilder extends 
org.apache.gobblin.writer.DataWriterBuilder {
 
     private final List<Object> _recordCollector;
+    private final int _errorAtCount;
+    private int _recordCount = 0;
 
-    TestStreamingDataWriterBuilder(List<Object> recordCollector) {
+    TestStreamingDataWriterBuilder(List<Object> recordCollector, int 
errorAtCount) {
       _recordCollector = recordCollector;
+      _errorAtCount = errorAtCount;
     }
 
     @Override
@@ -478,6 +558,11 @@ public class TaskContinuousTest {
         @Override
         public void writeEnvelope(RecordEnvelope<Object> recordEnvelope)
             throws IOException {
+          _recordCount++;
+
+          if (_recordCount >= _errorAtCount) {
+            throw new IOException("Errored after record count " + 
_errorAtCount);
+          }
           _recordCollector.add(recordEnvelope.getRecord());
           String source = recordEnvelope.getWatermark().getSource();
           if (this.source.get() != null) {
@@ -488,6 +573,7 @@ public class TaskContinuousTest {
           this.lastWatermark.set(recordEnvelope.getWatermark());
           recordEnvelope.ack();
           this.source.set(source);
+
         }
 
         @Override
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index d48349a..abb8f8e 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -51,6 +51,7 @@ import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.records.RecordStreamProcessor;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -142,7 +143,7 @@ public class TestRecordStream {
     Assert.assertNotNull(error);
 
     task.commit();
-    Assert.assertEquals(task.getTaskState().getWorkingState(), 
WorkUnitState.WorkingState.SUCCESSFUL);
+    Assert.assertEquals(task.getTaskState().getWorkingState(), 
WorkUnitState.WorkingState.FAILED);
 
     Assert.assertEquals(converter.records, Lists.newArrayList("a", "b"));
     Assert.assertEquals(converter.messages, Lists.newArrayList(
@@ -335,6 +336,7 @@ public class TestRecordStream {
     when(mockTaskContext.getRowLevelPolicyChecker(anyInt())).
         thenReturn(new RowLevelPolicyChecker(Lists.newArrayList(), "ss", 
FileSystem.getLocal(new Configuration())));
     when(mockTaskContext.getDataWriterBuilder(anyInt(), 
anyInt())).thenReturn(writer);
+    
when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
 
     // Create a mock TaskPublisher
     TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);

Reply via email to