This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 756c03df2d3 [fix][client] Clean up unacked messages when unsubscribing 
a topic with ack timeout backoff (#25916)
756c03df2d3 is described below

commit 756c03df2d3ef17ce60ee124bbef81be03fc0191
Author: Dream95 <[email protected]>
AuthorDate: Wed Jun 3 16:52:06 2026 +0800

    [fix][client] Clean up unacked messages when unsubscribing a topic with ack 
timeout backoff (#25916)
    
    Signed-off-by: Dream95 <[email protected]>
---
 .../client/impl/MultiTopicsConsumerImpl.java       |  6 +-
 .../impl/UnAckedTopicMessageRedeliveryTracker.java |  5 +-
 .../UnAckedTopicMessageRedeliveryTrackerTest.java  | 79 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a768a697f5d..6d82d744308 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1300,8 +1300,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     });
 
                     removeTopic(topicName);
-                    if (unAckedMessageTracker instanceof 
UnAckedTopicMessageTracker) {
-                        ((UnAckedTopicMessageTracker) 
unAckedMessageTracker).removeTopicMessages(topicName);
+                    if (unAckedMessageTracker instanceof 
UnAckedTopicMessageTracker tracker) {
+                        tracker.removeTopicMessages(topicName);
+                    } else if (unAckedMessageTracker instanceof 
UnAckedTopicMessageRedeliveryTracker tracker){
+                        tracker.removeTopicMessages(topicName);
                     }
 
                     unsubscribeFuture.complete(null);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
index 823dd4ad5f4..393557cd89a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTracker.java
@@ -44,7 +44,6 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
                 MessageId messageId = messageIdWrapper.getMessageId();
                 if (messageId instanceof TopicMessageId
                         && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
-                    HashSet<UnackMessageIdWrapper> exist = 
redeliveryMessageIdPartitionMap.get(messageIdWrapper);
                     entry.getValue().remove(messageIdWrapper);
                     iterator.remove();
                     messageIdWrapper.recycle();
@@ -53,11 +52,11 @@ public class UnAckedTopicMessageRedeliveryTracker extends 
UnAckedMessageRedelive
             }
 
             Iterator<MessageId> iteratorAckTimeOut = 
ackTimeoutMessages.keySet().iterator();
-            while (iterator.hasNext()) {
+            while (iteratorAckTimeOut.hasNext()) {
                 MessageId messageId = iteratorAckTimeOut.next();
                 if (messageId instanceof TopicMessageId
                         && ((TopicMessageId) 
messageId).getOwnerTopic().contains(topicName)) {
-                    iterator.remove();
+                    iteratorAckTimeOut.remove();
                     removed++;
                 }
             }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
new file mode 100644
index 00000000000..9fdab39145b
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedTopicMessageRedeliveryTrackerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.testng.annotations.Test;
+
+public class UnAckedTopicMessageRedeliveryTrackerTest {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testRemoveTopicMessages() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        ConnectionPool connectionPool = mock(ConnectionPool.class);
+        when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
+        when(client.getCnxPool()).thenReturn(connectionPool);
+        @Cleanup("stop")
+        Timer timer = new HashedWheelTimer(
+                new DefaultThreadFactory("pulsar-timer", 
Thread.currentThread().isDaemon()),
+                1, TimeUnit.MILLISECONDS);
+        when(client.timer()).thenReturn(timer);
+
+        ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+        doNothing().when(consumer).onAckTimeoutSend(any());
+        doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setAckTimeoutMillis(1_000_000);
+        conf.setTickDurationMillis(100_000);
+        
conf.setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build());
+
+        UnAckedTopicMessageRedeliveryTracker tracker =
+                new UnAckedTopicMessageRedeliveryTracker(client, consumer, 
conf);
+
+        String ownerTopic = "persistent://public/default/my-topic-partition-0";
+        TopicMessageIdImpl msgInPartition =
+                new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(1L, 0L, 
-1));
+        TopicMessageIdImpl msgInAckTimeout =
+                new TopicMessageIdImpl(ownerTopic, new MessageIdImpl(2L, 0L, 
-1));
+
+        assertTrue(tracker.add(msgInPartition));
+        tracker.ackTimeoutMessages.put(msgInAckTimeout, 
System.currentTimeMillis() + 1_000_000L);
+        assertEquals(tracker.size(), 2);
+
+        assertEquals(tracker.removeTopicMessages("my-topic"), 2);
+        assertTrue(tracker.isEmpty());
+
+        tracker.close();
+    }
+
+}

Reply via email to