This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b0421a6f453 [fix][broker] Fix chunked message loss when no consumers 
are available (#25077)
b0421a6f453 is described below

commit b0421a6f4534e61bd130593f6e68219dd6a7e5ef
Author: zhenJiangWang <[email protected]>
AuthorDate: Fri Jan 2 19:11:29 2026 +0800

    [fix][broker] Fix chunked message loss when no consumers are available 
(#25077)
    
    Co-authored-by: zjxxzjwang <[email protected]>
    (cherry picked from commit df89b60518da8a376c8423b2962196347b650680)
---
 .../broker/service/SharedConsumerAssignor.java     | 10 +++-
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 ...rsistentDispatcherMultipleConsumersClassic.java |  2 +-
 .../broker/service/SharedConsumerAssignorTest.java | 58 +++++++++++++++++++++-
 4 files changed, 68 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index 2161e418dff..bbf8dfd2b10 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -27,11 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 
 /**
  * The assigner to assign entries to the proper {@link Consumer} in the shared 
subscription.
  */
+@Slf4j
 @RequiredArgsConstructor
 public class SharedConsumerAssignor {
 
@@ -50,6 +52,8 @@ public class SharedConsumerAssignor {
     // Process the unassigned messages, e.g. adding them to the replay queue
     private final java.util.function.Consumer<EntryAndMetadata> 
unassignedMessageProcessor;
 
+    private final Subscription subscription;
+
     public Map<Consumer, List<EntryAndMetadata>> assign(final 
List<EntryAndMetadata> entryAndMetadataList,
                                                         final int 
numConsumers) {
         assert numConsumers >= 0;
@@ -58,7 +62,11 @@ public class SharedConsumerAssignor {
 
         Consumer consumer = getConsumer(numConsumers);
         if (consumer == null) {
-            entryAndMetadataList.forEach(EntryAndMetadata::release);
+            if (subscription != null) {
+                log.info("No consumer found to assign in topic:{}, 
subscription:{}, redelivering {} messages.",
+                        subscription.getTopic().getName(), 
subscription.getName(), entryAndMetadataList.size());
+            }
+            entryAndMetadataList.forEach(unassignedMessageProcessor);
             return consumerToEntries;
         }
         // The actual available permits might change, here we use the permits 
at the moment to assign entries
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c35d802f43d..b9a93e2c4fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -164,7 +164,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
-        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addEntryToReplay);
+        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addEntryToReplay, subscription);
         ServiceConfiguration serviceConfiguration = 
topic.getBrokerService().pulsar().getConfiguration();
         this.readFailureBackoff = new Backoff(
                 
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 0f496e461b8..0746b7215b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -157,7 +157,7 @@ public class PersistentDispatcherMultipleConsumersClassic 
extends AbstractPersis
                 : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
-        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay);
+        this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay, subscription);
         this.readFailureBackoff = new Backoff(
                 
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
                 TimeUnit.MILLISECONDS,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
index 1b253df0f37..d9e675fa7e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -58,7 +58,7 @@ public class SharedConsumerAssignorTest {
         roundRobinConsumerSelector.clear();
         entryAndMetadataList.clear();
         replayQueue.clear();
-        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add);
+        assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, 
replayQueue::add, null);
         final AtomicLong entryId = new AtomicLong(0L);
         final MockProducer producerA = new MockProducer("A", entryId, 
entryAndMetadataList);
         final MockProducer producerB = new MockProducer("B", entryId, 
entryAndMetadataList);
@@ -238,4 +238,60 @@ public class SharedConsumerAssignorTest {
         }
         return metadata;
     }
+
+    /**
+     * When there are no consumers online, chunk messages will not be directly 
lost.
+     */
+    @Test
+    public void testChunkMessagesNotBeLostNoConsumer() {
+        // 1. No consumer initially
+        Map<Consumer, List<EntryAndMetadata>> result = 
assignor.assign(entryAndMetadataList, 1);
+        assertTrue(result.isEmpty());
+        assertEquals(replayQueue.size(), entryAndMetadataList.size());
+        assertEquals(toString(replayQueue), toString(entryAndMetadataList));
+
+        // 2. Two Consumers come online
+        final Consumer consumerA = new Consumer("A", 100);
+        final Consumer consumerB = new Consumer("B", 100);
+        roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
+
+        // 3. Retry messages from replay queue
+        List<EntryAndMetadata> retryList = new ArrayList<>(replayQueue);
+        replayQueue.clear();
+
+        // Use a larger batch size to ensure we can process enough messages
+        result = assignor.assign(retryList, 10);
+
+        // 4. Verify consumer receives all messages
+        int totalReceived = 
result.values().stream().mapToInt(List::size).sum();
+        assertEquals(totalReceived, retryList.size());
+
+        // Verify that chunks are assigned to the same consumer
+        List<String> entriesA = toString(result.getOrDefault(consumerA, 
Collections.emptyList()));
+        List<String> entriesB = toString(result.getOrDefault(consumerB, 
Collections.emptyList()));
+
+        // Check A-1 chunks (0:1, 0:2, 0:5)
+        boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1"));
+        if (a1InA) {
+            assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3", 
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+            assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1")));
+        } else {
+            assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3", 
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+            assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1")));
+        }
+
+        // Check B-1 chunks (0:4, 0:6)
+        boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1"));
+        if (b1InA) {
+            assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2", 
"0:6@B-1-1-2")));
+            assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1")));
+        } else {
+            assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2", 
"0:6@B-1-1-2")));
+            assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1")));
+        }
+
+        // 5. Verify internal state is clean (since all chunks are completed)
+        assertTrue(assignor.getUuidToConsumer().isEmpty());
+    }
+
 }

Reply via email to