This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2ee8d1f [GOBBLIN-1351] Ensure KafkaStreamingExtractor shutdown is
invoked on task shutdown[]
2ee8d1f is described below
commit 2ee8d1f5d85030df786ca56e214e7782c3c1a4ab
Author: suvasude <[email protected]>
AuthorDate: Wed Jan 6 11:32:48 2021 -0800
[GOBBLIN-1351] Ensure KafkaStreamingExtractor shutdown is invoked on task
shutdown[]
Closes #3190 from sv2000/taskCancel
---
.../instrumented/extractor/InstrumentedExtractorBase.java | 8 +++++++-
.../source/extractor/extract/kafka/KafkaStreamingExtractor.java | 9 +++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
index 1dcb1d1..b951bfc 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.MetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.stream.RecordEnvelope;
@@ -156,7 +157,12 @@ public abstract class InstrumentedExtractorBase<S, D>
S schema = getSchema();
Flowable<StreamEntity<D>> recordStream = Flowable.generate(() ->
shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state,
emitter) -> {
if (state.get()) {
- emitter.onComplete();
+ // shutdown requested
+ try {
+ shutdown();
+ } catch (JobShutdownException exc) {
+ emitter.onError(exc);
+ }
}
try {
long startTimeNanos = 0;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 8412592..6aea04d 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -119,6 +119,15 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
@Override
public void shutdown() {
+ this.scheduledExecutorService.shutdownNow();
+ try {
+ boolean shutdown = this.scheduledExecutorService.awaitTermination(5,
TimeUnit.SECONDS);
+ if (!shutdown) {
+ log.error("Could not shutdown metrics collection threads in 5
seconds.");
+ }
+ } catch (InterruptedException e) {
+ log.error("Interrupted when attempting to shutdown metrics collection
threads.");
+ }
this.shutdownRequested.set(true);
}