This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1ed61fe [GOBBLIN-1359] Fix the error that Exception thrown by writer
be swallowed by Reactivex and cause jobs to hang
1ed61fe is described below
commit 1ed61fe7aa4943d912d74b61c7936e7023b991f8
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jan 27 18:17:17 2021 -0800
[GOBBLIN-1359] Fix the error that Exception thrown by writer be swallowed
by Reactivex and cause jobs to hang
[GOBBLIN-1359] second fix for exception swallowed
by Reactivex
remove unused import
address comments and change extractor shutdown to
make sure it can exit correctly
address comments
address comments
address comments
address comments
Closes #3214 from ZihanLi58/GOBBLIN-1359-test
---
.../org/apache/gobblin/configuration/ConfigurationKeys.java | 2 ++
.../gobblin/source/extractor/extract/FlushingExtractor.java | 8 ++++++++
.../extractor/extract/kafka/KafkaStreamingExtractor.java | 1 +
.../org/apache/gobblin/runtime/StreamModelTaskRunner.java | 13 ++++++++-----
.../src/main/java/org/apache/gobblin/runtime/fork/Fork.java | 7 +++++--
.../org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 3 ++-
6 files changed, 26 insertions(+), 8 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d3902fc..9830e46 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -357,6 +357,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT =
TimeUnit.MILLISECONDS.name();
public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes";
public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60;
+ public static final String FORK_FINISHED_CHECK_INTERVAL =
"fork.finished.check.interval";
+ public static final long DEFAULT_FORK_FINISHED_CHECK_INTERVAL = 1000;
public static final String FORK_CLOSE_WRITER_ON_COMPLETION =
"fork.closeWriterOnCompletion";
public static final boolean DEFAULT_FORK_CLOSE_WRITER_ON_COMPLETION = false;
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
index 9391a30..1d7f80c 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/FlushingExtractor.java
@@ -298,6 +298,14 @@ public abstract class FlushingExtractor<S, D> extends
EventBasedExtractor<S, D>
this.flushPublisher.get().publish(Collections.singletonList(workUnitState));
}
+ @Override
+ public void shutdown() {
+ // In case hasOutstandingFlush, we need to manually nack the ackable to
make sure the CountDownLatch not hang
+ if (this.hasOutstandingFlush) {
+ this.lastFlushAckable.nack(new IOException("Extractor already
shutdown"));
+ }
+ }
+
/**
* Persist the watermarks in {@link
WatermarkTracker#unacknowledgedWatermarks(Map)} to {@link WatermarkStorage}.
* The method is called when after a {@link FlushControlMessage} has been
acknowledged. To make retrieval of
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 6aea04d..9f2ac76 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -129,6 +129,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
log.error("Interrupted when attempting to shutdown metrics collection
threads.");
}
this.shutdownRequested.set(true);
+ super.shutdown();
}
@ToString
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 66c8f10..faee814 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -76,6 +76,7 @@ public class StreamModelTaskRunner {
protected void run() throws Exception {
long maxWaitInMinute =
taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES,
ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);
+ long initialDelay =
taskState.getPropAsLong(ConfigurationKeys.FORK_FINISHED_CHECK_INTERVAL,
ConfigurationKeys.DEFAULT_FORK_FINISHED_CHECK_INTERVAL);
// Get the fork operator. By default IdentityForkOperator is used with a
single branch.
ForkOperator forkOperator =
closer.register(this.taskContext.getForkOperator());
@@ -85,8 +86,9 @@ public class StreamModelTaskRunner {
ConnectableFlowable connectableStream = stream.getRecordStream().publish();
// The cancel is not propagated to the extractor's record generator when
it has been turned into a hot Flowable
- // by publish, so set the shutdownRequested flag on cancel to stop the
extractor
- Flowable streamWithShutdownOnCancel = connectableStream.doOnCancel(() ->
this.shutdownRequested.set(true));
+ // by publish, and in the case that extractor stuck in reading record when
cancel get called,
+ // we directly call shutdown to force it instead of setting the
shutdownRequested flag on cancel to stop the extractor
+ Flowable streamWithShutdownOnCancel =
connectableStream.doOnCancel(this.extractor::shutdown);
stream = stream.withRecordStream(streamWithShutdownOnCancel);
@@ -149,11 +151,12 @@ public class StreamModelTaskRunner {
this.task.configureStreamingFork(fork);
}
}
-
- connectableStream.connect();
+ new Thread(() -> {
+ connectableStream.connect();
+ }).start();
if (!ExponentialBackoff.awaitCondition().callable(() ->
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
-
initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
{
+
initialDelay(initialDelay).maxDelay(initialDelay).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
{
throw new TimeoutException("Forks did not finish withing specified
timeout.");
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index d2ff89c..8de98f5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
@@ -228,8 +227,12 @@ public class Fork<S, D> implements Closeable, FinalState,
RecordStreamConsumer<S
r.ack();
}
}, e -> {
+ // Handle writer close in error case since onComplete will not call
when exception happens
+ if (this.writer.isPresent()) {
+ this.writer.get().close();
+ }
logger.error("Failed to process record.", e);
- throw(new RuntimeException(e));
+ verifyAndSetForkState(ForkState.RUNNING, ForkState.FAILED);
},
() -> {
if (this.writer.isPresent()) {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 9c1f03b..fe24f0f 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -835,7 +835,8 @@ public class GobblinYarnAppLauncher {
LOGGER.debug("All containerLaunchContext tokens: {} present in file {}
", credentials.getAllTokens(), System.getenv(HADOOP_TOKEN_FILE_LOCATION));
}
- TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
null, ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
+ TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
+ Optional.absent(), ConfigUtils.getStringList(this.config,
TokenUtils.OTHER_NAMENODES));
Closer closer = Closer.create();
try {