This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26cf550bb96 [improve][client] Clean up unacked message tracker when
topics are removed in multi-topic consumers (#25923)
26cf550bb96 is described below
commit 26cf550bb9635a8107e5628800b465778c847a91
Author: Dream95 <[email protected]>
AuthorDate: Fri Jun 5 18:33:24 2026 +0800
[improve][client] Clean up unacked message tracker when topics are removed
in multi-topic consumers (#25923)
Signed-off-by: Dream95 <[email protected]>
---
.../client/impl/MultiTopicsConsumerImpl.java | 17 +++++---
.../impl/PatternMultiTopicsConsumerImpl.java | 1 +
.../client/impl/MultiTopicsConsumerImplTest.java | 51 ++++++++++++++++++++++
.../impl/PatternMultiTopicsConsumerImplTest.java | 41 +++++++++++++++++
4 files changed, 104 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6d82d744308..aa7b710ff3b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -982,6 +982,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
}
+ protected void removeTopicMessagesFromUnackedTracker(String topicName) {
+ if (unAckedMessageTracker instanceof UnAckedTopicMessageTracker
tracker) {
+ tracker.removeTopicMessages(topicName);
+ } else if (unAckedMessageTracker instanceof
UnAckedTopicMessageRedeliveryTracker tracker) {
+ tracker.removeTopicMessages(topicName);
+ }
+ }
+
/***
* Subscribe one more given topic.
* @param topicName topic name without the partition suffix.
@@ -1300,11 +1308,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
removeTopic(topicName);
- if (unAckedMessageTracker instanceof
UnAckedTopicMessageTracker tracker) {
- tracker.removeTopicMessages(topicName);
- } else if (unAckedMessageTracker instanceof
UnAckedTopicMessageRedeliveryTracker tracker){
- tracker.removeTopicMessages(topicName);
- }
+ removeTopicMessagesFromUnackedTracker(topicName);
unsubscribeFuture.complete(null);
log.info().attr("topic", topicName)
@@ -1431,7 +1435,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
}
- return FutureUtil.waitForAll(futures);
+ return FutureUtil.waitForAll(futures)
+ .thenRun(() ->
removeTopicMessagesFromUnackedTracker(topicName));
} else if (oldPartitionNumber < currentPartitionNumber) {
allTopicPartitionsNumber.addAndGet(currentPartitionNumber -
oldPartitionNumber);
partitionedTopics.put(topicName, currentPartitionNumber);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index e6923eb45d3..c636db4cd0b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -367,6 +367,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
removedPartitionedTopicsForLog.add(String.format("%s with %s partitions",
groupedTopicRemoved, partitions));
partitionedTopics.remove(groupedTopicRemoved,
partitions);
+
removeTopicMessagesFromUnackedTracker(groupedTopicRemoved);
}
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 6b9fd19e5e1..217b2782ea6 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -37,6 +37,7 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -269,4 +270,54 @@ public class MultiTopicsConsumerImplTest {
verify(clientMock, times(3)).getPartitionedTopicMetadata(any(),
anyBoolean(), anyBoolean());
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnTopicsExtendedRemovedTopicCleansUnackedMessages() {
+ String topicName = "persistent://public/default/deleted-topic";
+ String topicPartition0 = topicName + "-partition-0";
+ String topicPartition1 = topicName + "-partition-1";
+ String otherTopicPartition =
"persistent://public/default/other-topic-partition-0";
+
+ ConsumerConfigurationData<byte[]> consumerConfData = new
ConsumerConfigurationData<>();
+ consumerConfData.setSubscriptionName("subscriptionName");
+ consumerConfData.setAutoUpdatePartitions(true);
+ consumerConfData.setAutoUpdatePartitionsIntervalSeconds(60);
+ consumerConfData.setAckTimeoutMillis(1000);
+
+ MultiTopicsConsumerImpl<byte[]> impl =
createMultiTopicsConsumer(consumerConfData);
+
+ impl.partitionedTopics.put(topicName, 2);
+ impl.allTopicPartitionsNumber.set(2);
+
+ ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>)
mock(ConsumerImpl.class);
+ ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>)
mock(ConsumerImpl.class);
+ when(partitionConsumer0.getTopic()).thenReturn(topicPartition0);
+ when(partitionConsumer1.getTopic()).thenReturn(topicPartition1);
+
when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ impl.consumers.put(topicPartition0, partitionConsumer0);
+ impl.consumers.put(topicPartition1, partitionConsumer1);
+
+ TopicMessageIdImpl removedTopicMessageId = new
TopicMessageIdImpl(topicPartition0, new MessageIdImpl(1, 1, 0));
+ TopicMessageIdImpl otherTopicMessageId =
+ new TopicMessageIdImpl(otherTopicPartition, new
MessageIdImpl(2, 2, 0));
+ impl.getUnAckedMessageTracker().add(removedTopicMessageId);
+ impl.getUnAckedMessageTracker().add(otherTopicMessageId);
+ assertEquals(impl.getUnAckedMessageTracker().size(), 2);
+
+ when(impl.client.getPartitionsForTopic(topicName,
false)).thenReturn(CompletableFuture.completedFuture(
+ Collections.emptyList()));
+
+ PartitionsChangedListener listener =
impl.topicsPartitionChangedListener;
+ listener.onTopicsExtended(Collections.singleton(topicName)).join();
+
+ assertTrue(impl.getConsumers().isEmpty());
+ assertEquals(impl.partitionedTopics.get(topicName),
Integer.valueOf(0));
+ assertEquals(impl.allTopicPartitionsNumber.get(), 0);
+ assertEquals(impl.getUnAckedMessageTracker().size(), 1);
+
assertTrue(impl.getUnAckedMessageTracker().remove(otherTopicMessageId));
+ verify(partitionConsumer0).closeAsync();
+ verify(partitionConsumer1).closeAsync();
+ }
+
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
index cd799edb3bd..84fdca77feb 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java
@@ -310,6 +310,47 @@ public class PatternMultiTopicsConsumerImplTest {
assertThat(invocationCount.get()).isEqualTo(5);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
testOnTopicsRemovedCleansUnackedMessagesForRemovedPartitionedTopic() {
+ String partitionedTopic =
"persistent://tenant/namespace/deleted-topic";
+ String partition0 = partitionedTopic + "-partition-0";
+ String partition1 = partitionedTopic + "-partition-1";
+ String otherTopicPartition =
"persistent://tenant/namespace/other-topic-partition-0";
+ TopicsPattern topicsPattern =
+
TopicsPatternFactory.create("persistent://tenant/namespace/.*",
TopicsPattern.RegexImplementation.JDK);
+ ConsumerConfigurationData<byte[]> consumerConfData =
createConsumerConfigurationData();
+ consumerConfData.setAckTimeoutMillis(1000);
+
+ PatternMultiTopicsConsumerImpl<byte[]> consumer =
+ createPatternMultiTopicsConsumer(consumerConfData,
topicsPattern);
+
+ consumer.partitionedTopics.put(partitionedTopic, 2);
+
+ ConsumerImpl<byte[]> partitionConsumer0 = (ConsumerImpl<byte[]>)
mock(ConsumerImpl.class);
+ ConsumerImpl<byte[]> partitionConsumer1 = (ConsumerImpl<byte[]>)
mock(ConsumerImpl.class);
+
when(partitionConsumer0.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
when(partitionConsumer1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ consumer.consumers.put(partition0, partitionConsumer0);
+ consumer.consumers.put(partition1, partitionConsumer1);
+
+ TopicMessageIdImpl removedTopicMessageId = new
TopicMessageIdImpl(partition0, new MessageIdImpl(1, 1, 0));
+ TopicMessageIdImpl otherTopicMessageId =
+ new TopicMessageIdImpl(otherTopicPartition, new
MessageIdImpl(2, 2, 0));
+ consumer.getUnAckedMessageTracker().add(removedTopicMessageId);
+ consumer.getUnAckedMessageTracker().add(otherTopicMessageId);
+ assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(2);
+
+
consumer.topicsChangeListener.onTopicsRemoved(Arrays.asList(partition0,
partition1)).join();
+
+
assertThat(consumer.partitionedTopics.containsKey(partitionedTopic)).isFalse();
+ assertThat(consumer.consumers).doesNotContainKeys(partition0,
partition1);
+ assertThat(consumer.getUnAckedMessageTracker().size()).isEqualTo(1);
+
assertThat(consumer.getUnAckedMessageTracker().remove(otherTopicMessageId)).isTrue();
+ verify(partitionConsumer0).closeAsync();
+ verify(partitionConsumer1).closeAsync();
+ }
+
private static void runTimerTasks(Deque<TimerTask> tasks) throws Exception
{
// first drain the queue to a list to avoid an infinite loop
List<TimerTask> taskList = new ArrayList<>();