This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 9941edb  Stop OffsetStore when stopping the connector (#12457)
9941edb is described below

commit 9941edbaa1212ec7447d2c1f9571c93cfeed15ac
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Oct 23 21:34:38 2021 -0700

    Stop OffsetStore when stopping the connector (#12457)
    
    ### Motivation
    
    Source connectors based on KCA (all debezium ones) don't stop properly on 
error / don't restart.
    https://github.com/apache/pulsar/pull/12441 fixes one problem, this PR 
fixes another: ofsetStore is not closed on connector stop() and 
producer/consumer aren't closed too, preventing the connector from shutting 
down.
    
    ### Modifications
    
    Closing offset store on connector stop.
    
    (cherry picked from commit 63454e9b2573b8f7ba6a023402c92a17b033ee56)
---
 .../pulsar/io/kafka/connect/AbstractKafkaConnectSource.java    |  6 ++++++
 .../pulsar/io/kafka/connect/PulsarOffsetBackingStore.java      | 10 ++++++++++
 2 files changed, 16 insertions(+)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 3901c5f..5bd85a0 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -180,6 +180,12 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
     public void close() {
         if (sourceTask != null) {
             sourceTask.stop();
+            sourceTask = null;
+        }
+
+        if (offsetStore != null) {
+            offsetStore.stop();
+            offsetStore = null;
         }
     }
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index b1338f8..495c8b9 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -158,12 +158,19 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
 
     @Override
     public void stop() {
+        log.info("Stopping PulsarOffsetBackingStore");
         if (null != producer) {
             try {
+                producer.flush();
+            } catch (PulsarClientException pce) {
+                log.warn("Failed to flush the producer", pce);
+            }
+            try {
                 producer.close();
             } catch (PulsarClientException e) {
                 log.warn("Failed to close producer", e);
             }
+            producer = null;
         }
         if (null != reader) {
             try {
@@ -171,7 +178,10 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
             } catch (IOException e) {
                 log.warn("Failed to close reader", e);
             }
+            reader = null;
         }
+
+        // do not close the client, it is provided by the sink context
     }
 
     @Override

Reply via email to