This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 9941edb Stop OffsetStore when stopping the connector (#12457)
9941edb is described below
commit 9941edbaa1212ec7447d2c1f9571c93cfeed15ac
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Oct 23 21:34:38 2021 -0700
Stop OffsetStore when stopping the connector (#12457)
### Motivation
Source connectors based on KCA (all debezium ones) don't stop properly on
error / don't restart.
https://github.com/apache/pulsar/pull/12441 fixes one problem, this PR
fixes another: ofsetStore is not closed on connector stop() and
producer/consumer aren't closed too, preventing the connector from shutting
down.
### Modifications
Closing offset store on connector stop.
(cherry picked from commit 63454e9b2573b8f7ba6a023402c92a17b033ee56)
---
.../pulsar/io/kafka/connect/AbstractKafkaConnectSource.java | 6 ++++++
.../pulsar/io/kafka/connect/PulsarOffsetBackingStore.java | 10 ++++++++++
2 files changed, 16 insertions(+)
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 3901c5f..5bd85a0 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -180,6 +180,12 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
public void close() {
if (sourceTask != null) {
sourceTask.stop();
+ sourceTask = null;
+ }
+
+ if (offsetStore != null) {
+ offsetStore.stop();
+ offsetStore = null;
}
}
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index b1338f8..495c8b9 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -158,12 +158,19 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
@Override
public void stop() {
+ log.info("Stopping PulsarOffsetBackingStore");
if (null != producer) {
try {
+ producer.flush();
+ } catch (PulsarClientException pce) {
+ log.warn("Failed to flush the producer", pce);
+ }
+ try {
producer.close();
} catch (PulsarClientException e) {
log.warn("Failed to close producer", e);
}
+ producer = null;
}
if (null != reader) {
try {
@@ -171,7 +178,10 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
} catch (IOException e) {
log.warn("Failed to close reader", e);
}
+ reader = null;
}
+
+ // do not close the client, it is provided by the sink context
}
@Override