This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f7893309ed6 [fix][broker] Fix chunked message loss when no consumers
are available (#25077)
f7893309ed6 is described below
commit f7893309ed6154c12e0ba31cf38d7eaf16a32f3a
Author: zhenJiangWang <[email protected]>
AuthorDate: Fri Jan 2 19:11:29 2026 +0800
[fix][broker] Fix chunked message loss when no consumers are available
(#25077)
Co-authored-by: zjxxzjwang <[email protected]>
(cherry picked from commit df89b60518da8a376c8423b2962196347b650680)
---
.../broker/service/SharedConsumerAssignor.java | 10 +++-
.../PersistentDispatcherMultipleConsumers.java | 2 +-
.../broker/service/SharedConsumerAssignorTest.java | 58 +++++++++++++++++++++-
3 files changed, 67 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
index 2161e418dff..bbf8dfd2b10 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java
@@ -27,11 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.MessageMetadata;
/**
* The assigner to assign entries to the proper {@link Consumer} in the shared
subscription.
*/
+@Slf4j
@RequiredArgsConstructor
public class SharedConsumerAssignor {
@@ -50,6 +52,8 @@ public class SharedConsumerAssignor {
// Process the unassigned messages, e.g. adding them to the replay queue
private final java.util.function.Consumer<EntryAndMetadata>
unassignedMessageProcessor;
+ private final Subscription subscription;
+
public Map<Consumer, List<EntryAndMetadata>> assign(final
List<EntryAndMetadata> entryAndMetadataList,
final int
numConsumers) {
assert numConsumers >= 0;
@@ -58,7 +62,11 @@ public class SharedConsumerAssignor {
Consumer consumer = getConsumer(numConsumers);
if (consumer == null) {
- entryAndMetadataList.forEach(EntryAndMetadata::release);
+ if (subscription != null) {
+ log.info("No consumer found to assign in topic:{},
subscription:{}, redelivering {} messages.",
+ subscription.getTopic().getName(),
subscription.getName(), entryAndMetadataList.size());
+ }
+ entryAndMetadataList.forEach(unassignedMessageProcessor);
return consumerToEntries;
}
// The actual available permits might change, here we use the permits
at the moment to assign entries
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b5a1e163771..76fd2c35e2b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -148,7 +148,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
- this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addMessageToReplay);
+ this.assignor = new SharedConsumerAssignor(this::getNextConsumer,
this::addMessageToReplay, subscription);
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
index 1b253df0f37..d9e675fa7e5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
@@ -58,7 +58,7 @@ public class SharedConsumerAssignorTest {
roundRobinConsumerSelector.clear();
entryAndMetadataList.clear();
replayQueue.clear();
- assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add);
+ assignor = new SharedConsumerAssignor(roundRobinConsumerSelector,
replayQueue::add, null);
final AtomicLong entryId = new AtomicLong(0L);
final MockProducer producerA = new MockProducer("A", entryId,
entryAndMetadataList);
final MockProducer producerB = new MockProducer("B", entryId,
entryAndMetadataList);
@@ -238,4 +238,60 @@ public class SharedConsumerAssignorTest {
}
return metadata;
}
+
+ /**
+ * When there are no consumers online, chunk messages will not be directly
lost.
+ */
+ @Test
+ public void testChunkMessagesNotBeLostNoConsumer() {
+ // 1. No consumer initially
+ Map<Consumer, List<EntryAndMetadata>> result =
assignor.assign(entryAndMetadataList, 1);
+ assertTrue(result.isEmpty());
+ assertEquals(replayQueue.size(), entryAndMetadataList.size());
+ assertEquals(toString(replayQueue), toString(entryAndMetadataList));
+
+ // 2. Two Consumers come online
+ final Consumer consumerA = new Consumer("A", 100);
+ final Consumer consumerB = new Consumer("B", 100);
+ roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
+
+ // 3. Retry messages from replay queue
+ List<EntryAndMetadata> retryList = new ArrayList<>(replayQueue);
+ replayQueue.clear();
+
+ // Use a larger batch size to ensure we can process enough messages
+ result = assignor.assign(retryList, 10);
+
+ // 4. Verify consumer receives all messages
+ int totalReceived =
result.values().stream().mapToInt(List::size).sum();
+ assertEquals(totalReceived, retryList.size());
+
+ // Verify that chunks are assigned to the same consumer
+ List<String> entriesA = toString(result.getOrDefault(consumerA,
Collections.emptyList()));
+ List<String> entriesB = toString(result.getOrDefault(consumerB,
Collections.emptyList()));
+
+ // Check A-1 chunks (0:1, 0:2, 0:5)
+ boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1"));
+ if (a1InA) {
+ assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3",
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+ assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1")));
+ } else {
+ assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3",
"0:2@A-1-1-3", "0:5@A-1-2-3")));
+ assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1")));
+ }
+
+ // Check B-1 chunks (0:4, 0:6)
+ boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1"));
+ if (b1InA) {
+ assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2",
"0:6@B-1-1-2")));
+ assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1")));
+ } else {
+ assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2",
"0:6@B-1-1-2")));
+ assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1")));
+ }
+
+ // 5. Verify internal state is clean (since all chunks are completed)
+ assertTrue(assignor.getUuidToConsumer().isEmpty());
+ }
+
}