This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b3b0ef0da822b19b5eaf53dc3f8f8fb952bf1313
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Oct 23 21:34:38 2021 -0700

    Stop OffsetStore when stopping the connector (#12457)
    
    Source connectors based on KCA (all debezium ones) don't stop properly on 
error / don't restart.
    https://github.com/apache/pulsar/pull/12441 fixes one problem, this PR 
fixes another: ofsetStore is not closed on connector stop() and 
producer/consumer aren't closed too, preventing the connector from shutting 
down.
    
    Closing offset store on connector stop.
    
    (cherry picked from commit 63454e9b2573b8f7ba6a023402c92a17b033ee56)
---
 .../io/kafka/connect/AbstractKafkaConnectSource.java    |  6 ++++++
 .../io/kafka/connect/PulsarOffsetBackingStore.java      | 17 ++++++++++-------
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 1fbacfb..395f779 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -167,6 +167,12 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
     public void close() {
         if (sourceTask != null) {
             sourceTask.stop();
+            sourceTask = null;
+        }
+
+        if (offsetStore != null) {
+            offsetStore.stop();
+            offsetStore = null;
         }
     }
 
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 46da98a..3757bf9 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -159,12 +159,19 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
 
     @Override
     public void stop() {
+        log.info("Stopping PulsarOffsetBackingStore");
         if (null != producer) {
             try {
+                producer.flush();
+            } catch (PulsarClientException pce) {
+                log.warn("Failed to flush the producer", pce);
+            }
+            try {
                 producer.close();
             } catch (PulsarClientException e) {
                 log.warn("Failed to close producer", e);
             }
+            producer = null;
         }
         if (null != reader) {
             try {
@@ -172,14 +179,10 @@ public class PulsarOffsetBackingStore implements 
OffsetBackingStore {
             } catch (IOException e) {
                 log.warn("Failed to close reader", e);
             }
+            reader = null;
         }
-        if (null != client) {
-            try {
-                client.close();
-            } catch (IOException e) {
-                log.warn("Failed to close client", e);
-            }
-        }
+
+        // do not close the client, it is provided by the sink context
     }
 
     @Override

Reply via email to