This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f7893309ed6 [fix][broker] Fix chunked message loss when no consumers 
are available (#25077)
f7893309ed6 is described below

commit f7893309ed6154c12e0ba31cf38d7eaf16a32f3a
Author: zhenJiangWang <[email protected]>
AuthorDate: Fri Jan 2 19:11:29 2026 +0800

    [fix][broker] Fix chunked message loss when no consumers are available 
(#25077)
    
    Co-authored-by: zjxxzjwang <[email protected]>
    (cherry picked from commit df89b60518da8a376c8423b2962196347b650680)
---
 .../broker/service/SharedConsumerAssignor.java     | 10 +++-
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 .../broker/service/SharedConsumerAssignorTest.java | 58 +++++++++++++++++++++-
 3 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index 2161e418dff..bbf8dfd2b10 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -27,11 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
 /**
  * The assigner to assign entries to the proper {@link Consumer} in the shared 
subscription.
  */
+@Slf4j
 @RequiredArgsConstructor
 public class SharedConsumerAssignor {
 
@@ -50,6 +52,8 @@ public class SharedConsumerAssignor {
     // Process the unassigned messages, e.g. adding them to the replay queue
     private final java.util.function.Consumer<EntryAndMetadata> 
unassignedMessageProcessor;
 
+    private final Subscription subscription;
+
     public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata> entryAndMetadataList,
                                                         final int 
numConsumers) {
         assert numConsumers >= 0;
@@ -58,7 +62,11 @@ public class SharedConsumerAssignor {
 
         Consumer consumer = getConsumer(numConsumers);
         if (consumer == null) {
-            entryAndMetadataList.forEach(EntryAndMetadata::release);
+            if (subscription != null) {
+                log.info("No consumer found to assign in topic:{}, 
subscription:{}, redelivering {} messages.",
+                        subscription.getTopic().getName(), 
subscription.getName(), entryAndMetadataList.size());
+            }
+            entryAndMetadataList.forEach(unassignedMessageProcessor);
             return consumerToEntries;
         }
         // The actual available permits might change, here we use the permits 
at the moment to assign entries
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b5a1e163771..76fd2c35e2b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -148,7 +148,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
-        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay);
+        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay, subscription);
         this.readFailureBackoff = new Backoff(
                 
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
                 TimeUnit.MILLISECONDS,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
index 1b253df0f37..d9e675fa7e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -58,7 +58,7 @@ public class SharedConsumerAssignorTest {
         roundRobinConsumerSelector.clear();
         entryAndMetadataList.clear();
         replayQueue.clear();
-        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add);
+        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add, null);
         final AtomicLong entryId = new AtomicLong(0L);
         final MockProducer producerA = new MockProducer("A", entryId, 
entryAndMetadataList);
         final MockProducer producerB = new MockProducer("B", entryId, 
entryAndMetadataList);
@@ -238,4 +238,60 @@ public class SharedConsumerAssignorTest {
         }
         return metadata;
     }
+
+    /**
+     * When there are no consumers online, chunk messages will not be directly 
lost.
+     */
+    @Test
+    public void testChunkMessagesNotBeLostNoConsumer() {
+        // 1. No consumer initially
+        Map<Consumer, List<EntryAndMetadata>> result = 
assignor.assign(entryAndMetadataList, 1);
+        assertTrue(result.isEmpty());
+        assertEquals(replayQueue.size(), entryAndMetadataList.size());
+        assertEquals(toString(replayQueue), toString(entryAndMetadataList));
+
+        // 2. Two Consumers come online
+        final Consumer consumerA = new Consumer("A", 100);
+        final Consumer consumerB = new Consumer("B", 100);
+        roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
+
+        // 3. Retry messages from replay queue
+        List<EntryAndMetadata> retryList = new ArrayList<>(replayQueue);
+        replayQueue.clear();
+
+        // Use a larger batch size to ensure we can process enough messages
+        result = assignor.assign(retryList, 10);
+
+        // 4. Verify consumer receives all messages
+        int totalReceived = 
result.values().stream().mapToInt(List::size).sum();
+        assertEquals(totalReceived, retryList.size());
+
+        // Verify that chunks are assigned to the same consumer
+        List<String> entriesA = toString(result.getOrDefault(consumerA, 
Collections.emptyList()));
+        List<String> entriesB = toString(result.getOrDefault(consumerB, 
Collections.emptyList()));
+
+        // Check A-1 chunks (0:1, 0:2, 0:5)
+        boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1"));
+        if (a1InA) {
+            assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3", 
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+            assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1")));
+        } else {
+            assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3", 
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+            assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1")));
+        }
+
+        // Check B-1 chunks (0:4, 0:6)
+        boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1"));
+        if (b1InA) {
+            assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2", 
"0:6@B-1-1-2")));
+            assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1")));
+        } else {
+            assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2", 
"0:6@B-1-1-2")));
+            assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1")));
+        }
+
+        // 5. Verify internal state is clean (since all chunks are completed)
+        assertTrue(assignor.getUuidToConsumer().isEmpty());
+    }
+
 }

Reply via email to