This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 756c03df2d3 [fix][client] Clean up unacked messages when unsubscribing
a topic with ack timeout backoff (#25916)
756c03df2d3 is described below
commit 756c03df2d3ef17ce60ee124bbef81be03fc0191
Author: Dream95 <[email protected]>
AuthorDate: Wed Jun 3 16:52:06 2026 +0800
[fix][client] Clean up unacked messages when unsubscribing a topic with ack
timeout backoff (#25916)
Signed-off-by: Dream95 <[email protected]>
---
.../client/impl/MultiTopicsConsumerImpl.java | 6 +-
.../impl/UnAckedTopicMessageRedeliveryTracker.java | 5 +-
.../UnAckedTopicMessageRedeliveryTrackerTest.java | 79 ++++++++++++++++++++++
3 files changed, 85 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a768a697f5d..6d82d744308 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1300,8 +1300,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
removeTopic(topicName);
- if (unAckedMessageTracker instanceof
UnAckedTopicMessageTracker) {
- ((UnAckedTopicMessageTracker)
unAckedMessageTracker).removeTopicMessages(topicName);
+ if (unAckedMessageTracker instanceof
UnAckedTopicMessageTracker tracker) {
+ tracker.removeTopicMessages(topicName);
+ } else if (unAckedMessageTracker instanceof
UnAckedTopicMessageRedeliveryTracker tracker){
+ tracker.removeTopicMessages(topicName);
}
unsubscribeFuture.complete(null);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 823dd4ad5f4..393557cd89a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -44,7 +44,6 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
MessageId messageId = messageIdWrapper.getMessageId();
if (messageId instanceof TopicMessageId
&& ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
- HashSet<UnackMessageIdWrapper> exist =
redeliveryMessageIdPartitionMap.get(messageIdWrapper);
entry.getValue().remove(messageIdWrapper);
iterator.remove();
messageIdWrapper.recycle();
@@ -53,11 +52,11 @@ public class UnAckedTopicMessageRedeliveryTracker extends
UnAckedMessageRedelive
}
Iterator<MessageId> iteratorAckTimeOut =
ackTimeoutMessages.keySet().iterator();
- while (iterator.hasNext()) {
+ while (iteratorAckTimeOut.hasNext()) {
MessageId messageId = iteratorAckTimeOut.next();
if (messageId instanceof TopicMessageId
&& ((TopicMessageId)
messageId).getOwnerTopic().contains(topicName)) {
- iterator.remove();
+ iteratorAckTimeOut.remove();
removed++;
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
new file mode 100644
index 00000000000..9fdab39145b
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.testng.annotations.Test;
+
+public class UnAckedTopicMessageRedeliveryTrackerTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRemoveTopicMessages() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+ when(client.getCnxPool()).thenReturn(connectionPool);
+ @Cleanup("stop")
+ Timer timer = new HashedWheelTimer(
+ new DefaultThreadFactory("pulsar-timer",
Thread.currentThread().isDaemon()),
+ 1, TimeUnit.MILLISECONDS);
+ when(client.timer()).thenReturn(timer);
+
+ ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+ doNothing().when(consumer).onAckTimeoutSend(any());
+ doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+
+ ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+ conf.setAckTimeoutMillis(1_000_000);
+ conf.setTickDurationMillis(100_000);
+
conf.setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build());
+
+ UnAckedTopicMessageRedeliveryTracker tracker =
+ new UnAckedTopicMessageRedeliveryTracker(client, consumer,
conf);
+
+ String ownerTopic = "persistent://public/default/my-topic-partition-0";
+ TopicMessageIdImpl msgInPartition =
+ new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(1L, 0L,
-1));
+ TopicMessageIdImpl msgInAckTimeout =
+ new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(2L, 0L,
-1));
+
+ assertTrue(tracker.add(msgInPartition));
+ tracker.ackTimeoutMessages.put(msgInAckTimeout,
System.currentTimeMillis() + 1_000_000L);
+ assertEquals(tracker.size(), 2);
+
+ assertEquals(tracker.removeTopicMessages("my-topic"), 2);
+ assertTrue(tracker.isEmpty());
+
+ tracker.close();
+ }
+
+}