This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ee8d1f  [GOBBLIN-1351] Ensure KafkaStreamingExtractor shutdown is 
invoked on task shutdown[]
2ee8d1f is described below

commit 2ee8d1f5d85030df786ca56e214e7782c3c1a4ab
Author: suvasude <[email protected]>
AuthorDate: Wed Jan 6 11:32:48 2021 -0800

    [GOBBLIN-1351] Ensure KafkaStreamingExtractor shutdown is invoked on task 
shutdown[]
    
    Closes #3190 from sv2000/taskCancel
---
 .../instrumented/extractor/InstrumentedExtractorBase.java        | 8 +++++++-
 .../source/extractor/extract/kafka/KafkaStreamingExtractor.java  | 9 +++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
index 1dcb1d1..b951bfc 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.MetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.records.RecordStreamWithMetadata;
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -156,7 +157,12 @@ public abstract class InstrumentedExtractorBase<S, D>
     S schema = getSchema();
     Flowable<StreamEntity<D>> recordStream = Flowable.generate(() -> 
shutdownRequest, (BiConsumer<AtomicBoolean, Emitter<StreamEntity<D>>>) (state, 
emitter) -> {
       if (state.get()) {
-        emitter.onComplete();
+        // shutdown requested
+        try {
+          shutdown();
+        } catch (JobShutdownException exc) {
+          emitter.onError(exc);
+        }
       }
       try {
         long startTimeNanos = 0;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 8412592..6aea04d 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -119,6 +119,15 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
 
   @Override
   public void shutdown() {
+    this.scheduledExecutorService.shutdownNow();
+    try {
+      boolean shutdown = this.scheduledExecutorService.awaitTermination(5, 
TimeUnit.SECONDS);
+      if (!shutdown) {
+        log.error("Could not shutdown metrics collection threads in 5 
seconds.");
+      }
+    } catch (InterruptedException e) {
+      log.error("Interrupted when attempting to shutdown metrics collection 
threads.");
+    }
     this.shutdownRequested.set(true);
   }
 

Reply via email to