This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8fb92f163afca8c3674b2e03d4ef538f04d38a8c Author: chenlin <[email protected]> AuthorDate: Sun Oct 31 17:14:00 2021 +0800 Add close method in the class of NegativeAcksTracker (#12469) (cherry picked from commit 3694aa1554e7f408a90eda2ba46eae17b425140a) --- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 1 + .../apache/pulsar/client/impl/NegativeAcksTracker.java | 16 +++++++++++++++- .../org/apache/pulsar/client/impl/ConsumerImplTest.java | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 352ee86..ce4711d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -960,6 +960,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (batchReceiveTimeout != null) { batchReceiveTimeout.cancel(); } + negativeAcksTracker.close(); stats.getStatTimeout().ifPresent(Timeout::cancel); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 16cfa0c..a062009 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import io.netty.util.Timeout; import io.netty.util.Timer; +import java.io.Closeable; import java.util.HashMap; import java.util.HashSet; import java.util.Set; @@ -30,7 +31,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap; -class NegativeAcksTracker { +class NegativeAcksTracker implements Closeable { private HashMap<MessageId, Long> nackedMessages = null; @@ -93,4 +94,17 @@ class NegativeAcksTracker { this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } } + + @Override + public synchronized void close() { + if (timeout != null && !timeout.isCancelled()) { + timeout.cancel(); + timeout = null; + } + + if (nackedMessages != null) { + nackedMessages.clear(); + nackedMessages = null; + } + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 37c9e0c..8a9e665 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -187,4 +187,18 @@ public class ConsumerImplTest { // then Assert.assertFalse(consumer.hasPendingBatchReceive()); } + + @Test + public void testClose() { + Exception checkException = null; + try { + if (consumer != null) { + consumer.negativeAcknowledge(new MessageIdImpl(-1, -1, -1)); + consumer.close(); + } + } catch (Exception e) { + checkException = e; + } + Assert.assertNull(checkException); + } }
