This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b3b0ef0da822b19b5eaf53dc3f8f8fb952bf1313 Author: Andrey Yegorov <[email protected]> AuthorDate: Sat Oct 23 21:34:38 2021 -0700 Stop OffsetStore when stopping the connector (#12457) Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart. https://github.com/apache/pulsar/pull/12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down. Closing offset store on connector stop. (cherry picked from commit 63454e9b2573b8f7ba6a023402c92a17b033ee56) --- .../io/kafka/connect/AbstractKafkaConnectSource.java | 6 ++++++ .../io/kafka/connect/PulsarOffsetBackingStore.java | 17 ++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 1fbacfb..395f779 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -167,6 +167,12 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> { public void close() { if (sourceTask != null) { sourceTask.stop(); + sourceTask = null; + } + + if (offsetStore != null) { + offsetStore.stop(); + offsetStore = null; } } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java index 46da98a..3757bf9 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java @@ -159,12 +159,19 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { @Override public void stop() { + log.info("Stopping PulsarOffsetBackingStore"); if (null != producer) { try { + producer.flush(); + } catch (PulsarClientException pce) { + log.warn("Failed to flush the producer", pce); + } + try { producer.close(); } catch (PulsarClientException e) { log.warn("Failed to close producer", e); } + producer = null; } if (null != reader) { try { @@ -172,14 +179,10 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore { } catch (IOException e) { log.warn("Failed to close reader", e); } + reader = null; } - if (null != client) { - try { - client.close(); - } catch (IOException e) { - log.warn("Failed to close client", e); - } - } + + // do not close the client, it is provided by the sink context } @Override
