This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit de813bf35afc7038465ecdf61921bf554293aa1b Author: Rajan Dhabalia <[email protected]> AuthorDate: Mon Aug 23 19:34:17 2021 -0700 [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754) (cherry picked from commit f154de74830ca2eaca67d01322fcb9a557d649ce) --- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 15 ++++++++++++--- .../apache/pulsar/client/impl/UnAckedMessageTracker.java | 1 + 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index ce84376..c9e2067 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -103,7 +103,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStatsRecorder stats; - private final UnAckedMessageTracker unAckedMessageTracker; + private UnAckedMessageTracker unAckedMessageTracker; private final ConsumerConfigurationData<T> internalConfig; private volatile BatchMessageIdImpl startMessageId = null; @@ -543,7 +543,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override public CompletableFuture<Void> closeAsync() { if (getState() == State.Closing || getState() == State.Closed) { - unAckedMessageTracker.close(); + if (unAckedMessageTracker != null) { + unAckedMessageTracker.close(); + } return CompletableFuture.completedFuture(null); } setState(State.Closing); @@ -580,7 +582,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } private void cleanupMultiConsumer() { - unAckedMessageTracker.close(); + if (unAckedMessageTracker != null) { + unAckedMessageTracker.close(); + unAckedMessageTracker = null; + } + if (partitionsAutoUpdateTimeout != null) { + partitionsAutoUpdateTimeout.cancel(); + partitionsAutoUpdateTimeout = null; + } client.cleanupConsumer(this); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index a244366..db616f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -248,6 +248,7 @@ public class UnAckedMessageTracker implements Closeable { try { if (timeout != null && !timeout.isCancelled()) { timeout.cancel(); + timeout = null; } this.clear(); } finally {
