This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 45cd065d32f09eb85cf71a6fde4ffbc562a34cce Author: Dream95 <[email protected]> AuthorDate: Fri Jun 5 18:33:24 2026 +0800 [improve][client] Clean up unacked message tracker when topics are removed in multi-topic consumers (#25923) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit 26cf550bb9635a8107e5628800b465778c847a91) --- .../client/impl/MultiTopicsConsumerImpl.java | 17 +++++--- .../impl/PatternMultiTopicsConsumerImpl.java | 1 + .../client/impl/MultiTopicsConsumerImplTest.java | 51 ++++++++++++++++++++++ .../impl/PatternMultiTopicsConsumerImplTest.java | 41 +++++++++++++++++ 4 files changed, 104 insertions(+), 6 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f1a543e95d4..f611abb179f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -974,6 +974,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } } + protected void removeTopicMessagesFromUnackedTracker(String topicName) { + if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker tracker) { + tracker.removeTopicMessages(topicName); + } else if (unAckedMessageTracker instanceof UnAckedTopicMessageRedeliveryTracker tracker) { + tracker.removeTopicMessages(topicName); + } + } + /*** * Subscribe one more given topic. * @param topicName topic name without the partition suffix. @@ -1283,11 +1291,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { }); removeTopic(topicName); - if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker tracker) { - tracker.removeTopicMessages(topicName); - } else if (unAckedMessageTracker instanceof UnAckedTopicMessageRedeliveryTracker tracker){ - tracker.removeTopicMessages(topicName); - } + removeTopicMessagesFromUnackedTracker(topicName); unsubscribeFuture.complete(null); log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}", @@ -1414,7 +1418,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } } - return FutureUtil.waitForAll(futures); + return FutureUtil.waitForAll(futures) + .thenRun(() -> removeTopicMessagesFromUnackedTracker(topicName)); } else if (oldPartitionNumber < currentPartitionNumber) { allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber); partitionedTopics.put(topicName, currentPartitionNumber); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 3db37864986..cbfe4832907 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -367,6 +367,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T removedPartitionedTopicsForLog.add(String.format("%s with %s partitions", groupedTopicRemoved, partitions)); partitionedTopics.remove(groupedTopicRemoved, partitions); + removeTopicMessagesFromUnackedTracker(groupedTopicRemoved); } } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 54175613e3b..b4d007d855f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -37,6 +37,7 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -266,4 +267,54 @@ public class MultiTopicsConsumerImplTest { verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean()); } + @Test + @SuppressWarnings("unchecked") + public void testOnTopicsExtendedRemovedTopicCleansUnackedMessages() { + String topicName = "persistent://public/default/deleted-topic"; + String topicPartition0 = topicName + "-partition-0"; + String topicPartition1 = topicName + "-partition-1"; + String otherTopicPartition = "persistent://public/default/other-topic-partition-0"; + + ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>(); + consumerConfData.setSubscriptionName("subscriptionName"); + consumerConfData.setAutoUpdatePartitions(true); + consumerConfData.setAutoUpdatePartitionsIntervalSeconds(60); + consumerConfData.setAckTimeoutMillis(1000); + + MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer(consumerConfData); + + impl.partitionedTopics.put(topicName, 2); + impl.allTopicPartitionsNumber.set(2); + + ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>) mock(ConsumerImpl.class); + ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>) mock(ConsumerImpl.class); + when(partitionConsumer0.getTopic()).thenReturn(topicPartition0); + when(partitionConsumer1.getTopic()).thenReturn(topicPartition1); + when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + impl.consumers.put(topicPartition0, partitionConsumer0); + impl.consumers.put(topicPartition1, partitionConsumer1); + + TopicMessageIdImpl removedTopicMessageId = new TopicMessageIdImpl(topicPartition0, new MessageIdImpl(1, 1, 0)); + TopicMessageIdImpl otherTopicMessageId = + new TopicMessageIdImpl(otherTopicPartition, new MessageIdImpl(2, 2, 0)); + impl.getUnAckedMessageTracker().add(removedTopicMessageId); + impl.getUnAckedMessageTracker().add(otherTopicMessageId); + assertEquals(impl.getUnAckedMessageTracker().size(), 2); + + when(impl.client.getPartitionsForTopic(topicName, false)).thenReturn(CompletableFuture.completedFuture( + Collections.emptyList())); + + PartitionsChangedListener listener = impl.topicsPartitionChangedListener; + listener.onTopicsExtended(Collections.singleton(topicName)).join(); + + assertTrue(impl.getConsumers().isEmpty()); + assertEquals(impl.partitionedTopics.get(topicName), Integer.valueOf(0)); + assertEquals(impl.allTopicPartitionsNumber.get(), 0); + assertEquals(impl.getUnAckedMessageTracker().size(), 1); + assertTrue(impl.getUnAckedMessageTracker().remove(otherTopicMessageId)); + verify(partitionConsumer0).closeAsync(); + verify(partitionConsumer1).closeAsync(); + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java index cd799edb3bd..84fdca77feb 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java @@ -310,6 +310,47 @@ public class PatternMultiTopicsConsumerImplTest { assertThat(invocationCount.get()).isEqualTo(5); } + @Test + @SuppressWarnings("unchecked") + public void testOnTopicsRemovedCleansUnackedMessagesForRemovedPartitionedTopic() { + String partitionedTopic = "persistent://tenant/namespace/deleted-topic"; + String partition0 = partitionedTopic + "-partition-0"; + String partition1 = partitionedTopic + "-partition-1"; + String otherTopicPartition = "persistent://tenant/namespace/other-topic-partition-0"; + TopicsPattern topicsPattern = + TopicsPatternFactory.create("persistent://tenant/namespace/.*", TopicsPattern.RegexImplementation.JDK); + ConsumerConfigurationData<byte[]> consumerConfData = createConsumerConfigurationData(); + consumerConfData.setAckTimeoutMillis(1000); + + PatternMultiTopicsConsumerImpl<byte[]> consumer = + createPatternMultiTopicsConsumer(consumerConfData, topicsPattern); + + consumer.partitionedTopics.put(partitionedTopic, 2); + + ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>) mock(ConsumerImpl.class); + ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>) mock(ConsumerImpl.class); + when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + consumer.consumers.put(partition0, partitionConsumer0); + consumer.consumers.put(partition1, partitionConsumer1); + + TopicMessageIdImpl removedTopicMessageId = new TopicMessageIdImpl(partition0, new MessageIdImpl(1, 1, 0)); + TopicMessageIdImpl otherTopicMessageId = + new TopicMessageIdImpl(otherTopicPartition, new MessageIdImpl(2, 2, 0)); + consumer.getUnAckedMessageTracker().add(removedTopicMessageId); + consumer.getUnAckedMessageTracker().add(otherTopicMessageId); + assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(2); + + consumer.topicsChangeListener.onTopicsRemoved(Arrays.asList(partition0, partition1)).join(); + + assertThat(consumer.partitionedTopics.containsKey(partitionedTopic)).isFalse(); + assertThat(consumer.consumers).doesNotContainKeys(partition0, partition1); + assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(1); + assertThat(consumer.getUnAckedMessageTracker().remove(otherTopicMessageId)).isTrue(); + verify(partitionConsumer0).closeAsync(); + verify(partitionConsumer1).closeAsync(); + } + private static void runTimerTasks(Deque<TimerTask> tasks) throws Exception { // first drain the queue to a list to avoid an infinite loop List<TimerTask> taskList = new ArrayList<>();
