This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d40cfc6ce62873be6de9e6313fd6dac34688f089 Author: Yufan Sheng <[email protected]> AuthorDate: Fri Sep 10 12:10:12 2021 +0800 [FLINK-23864][connector/pulsar] Release Pulsar Message if user enable poolMessage option. --- .../pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java | 3 +++ .../source/reader/split/PulsarUnorderedPartitionSplitReader.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java index ac1821b..af650ea 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java @@ -74,6 +74,9 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit protected void finishedPollMessage(Message<byte[]> message) { // Nothing to do here. LOG.debug("Finished polling message {}", message); + + // Release message + message.release(); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index efa383f..9c8e3d7 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -118,6 +118,9 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) { sneakyClient(() -> pulsarConsumer.acknowledge(message)); } + + // Release message + message.release(); } @Override
