This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ed61fe  [GOBBLIN-1359] Fix the error that Exception thrown by writer 
be swallowed by Reactivex and cause jobs to hang
1ed61fe is described below

commit 1ed61fe7aa4943d912d74b61c7936e7023b991f8
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jan 27 18:17:17 2021 -0800

    [GOBBLIN-1359] Fix the error that Exception thrown by writer be swallowed 
by Reactivex and cause jobs to hang
    
    [GOBBLIN-1359] second fix for exception swallowed
    by Reactivex
    
    remove unused import
    
    address comments and change extractor shutdown to
    make sure it can exit correctly
    
    address comments
    
    address comments
    
    address comments
    
    address comments
    
    Closes #3214 from ZihanLi58/GOBBLIN-1359-test
---
 .../org/apache/gobblin/configuration/ConfigurationKeys.java |  2 ++
 .../gobblin/source/extractor/extract/FlushingExtractor.java |  8 ++++++++
 .../extractor/extract/kafka/KafkaStreamingExtractor.java    |  1 +
 .../org/apache/gobblin/runtime/StreamModelTaskRunner.java   | 13 ++++++++-----
 .../src/main/java/org/apache/gobblin/runtime/fork/Fork.java |  7 +++++--
 .../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java     |  3 ++-
 6 files changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d3902fc..9830e46 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -357,6 +357,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT = 
TimeUnit.MILLISECONDS.name();
   public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes";
   public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60;
+  public static final String FORK_FINISHED_CHECK_INTERVAL = 
"fork.finished.check.interval";
+  public static final long DEFAULT_FORK_FINISHED_CHECK_INTERVAL = 1000;
   public static final String FORK_CLOSE_WRITER_ON_COMPLETION = 
"fork.closeWriterOnCompletion";
   public static final boolean DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION = false;
 
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
index 9391a30..1d7f80c 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -298,6 +298,14 @@ public abstract class FlushingExtractor<S, D> extends 
EventBasedExtractor<S, D>
     
this.flushPublisher.get().publish(Collections.singletonList(workUnitState));
   }
 
+  @Override
+  public void shutdown() {
+    // In case hasOutstandingFlush, we need to manually nack the ackable to 
make sure the CountDownLatch not hang
+    if (this.hasOutstandingFlush) {
+      this.lastFlushAckable.nack(new IOException("Extractor already 
shutdown"));
+    }
+  }
+
   /**
    * Persist the watermarks in {@link 
WatermarkTracker#unacknowledgedWatermarks(Map)} to {@link WatermarkStorage}.
    * The method is called when after a {@link FlushControlMessage} has been 
acknowledged. To make retrieval of
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 6aea04d..9f2ac76 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -129,6 +129,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
       log.error("Interrupted when attempting to shutdown metrics collection 
threads.");
     }
     this.shutdownRequested.set(true);
+    super.shutdown();
   }
 
   @ToString
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 66c8f10..faee814 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -76,6 +76,7 @@ public class StreamModelTaskRunner {
 
   protected void run() throws Exception {
     long maxWaitInMinute = 
taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, 
ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);
+    long initialDelay = 
taskState.getPropAsLong(ConfigurationKeys.FORK_FINISHED_CHECK_INTERVAL, 
ConfigurationKeys.DEFAULT_FORK_FINISHED_CHECK_INTERVAL);
 
     // Get the fork operator. By default IdentityForkOperator is used with a 
single branch.
     ForkOperator forkOperator = 
closer.register(this.taskContext.getForkOperator());
@@ -85,8 +86,9 @@ public class StreamModelTaskRunner {
     ConnectableFlowable connectableStream = stream.getRecordStream().publish();
 
     // The cancel is not propagated to the extractor's record generator when 
it has been turned into a hot Flowable
-    // by publish, so set the shutdownRequested flag on cancel to stop the 
extractor
-    Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() -> 
this.shutdownRequested.set(true));
+    // by publish, and in the case that extractor stuck in reading record when 
cancel get called,
+    // we directly call shutdown to force it instead of setting the 
shutdownRequested flag on cancel to stop the extractor
+    Flowable streamWithShutdownOnCancel = 
connectableStream.doOnCancel(this.extractor::shutdown);
 
     stream = stream.withRecordStream(streamWithShutdownOnCancel);
 
@@ -149,11 +151,12 @@ public class StreamModelTaskRunner {
         this.task.configureStreamingFork(fork);
       }
     }
-
-    connectableStream.connect();
+    new Thread(() -> {
+      connectableStream.connect();
+    }).start();
 
     if (!ExponentialBackoff.awaitCondition().callable(() -> 
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
-        
initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
 {
+        
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
 {
       throw new TimeoutException("Forks did not finish withing specified 
timeout.");
     }
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index d2ff89c..8de98f5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.io.Closer;
 
 import edu.umd.cs.findbugs.annotations.SuppressWarnings;
@@ -228,8 +227,12 @@ public class Fork<S, D> implements Closeable, FinalState, 
RecordStreamConsumer<S
           r.ack();
         }
       }, e -> {
+          // Handle writer close in error case since onComplete will not call 
when exception happens
+          if (this.writer.isPresent()) {
+            this.writer.get().close();
+          }
           logger.error("Failed to process record.", e);
-          throw(new RuntimeException(e));
+          verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
           },
         () -> {
           if (this.writer.isPresent()) {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 9c1f03b..fe24f0f 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -835,7 +835,8 @@ public class GobblinYarnAppLauncher {
       LOGGER.debug("All containerLaunchContext tokens: {} present in file {} 
", credentials.getAllTokens(), System.getenv(HADOOP_TOKEN_FILE_LOCATION));
     }
 
-    TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName, 
null, ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
+    TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
+        Optional.absent(), ConfigUtils.getStringList(this.config, 
TokenUtils.OTHER_NAMENODES));
 
     Closer closer = Closer.create();
     try {

Reply via email to