This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f49c7b288a1 [fix][client] Send all chunkMessageIds to broker for
redelivery (#25229)
f49c7b288a1 is described below
commit f49c7b288a1d0f62cafe2bf80b25aa53c097dc2a
Author: cai minjian <[email protected]>
AuthorDate: Mon Feb 9 18:24:40 2026 +0800
[fix][client] Send all chunkMessageIds to broker for redelivery (#25229)
(cherry picked from commit 0a0ce6d012412f003b04f148548f6350fdcfb58c)
---
.../client/impl/MessageChunkingSharedTest.java | 33 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 -
.../pulsar/client/impl/NegativeAcksTracker.java | 22 ++++++++++++++-
3 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 3d24d3746d6..203715ca7db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -193,6 +193,39 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1",
"A-1"));
}
+ // Issue #25220
+ @Test
+ public void testNegativeAckChunkedMessage() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .enableChunking(true)
+ .chunkMaxMessageSize(1024) // 1KB max - forces chunking for
larger messages
+ .create();
+ String longMessage = "X".repeat(10 * 1024);
+ producer.sendAsync(longMessage);
+ producer.flush();
+
+ // negative ack the first message
+ consumer.negativeAcknowledge(consumer.receive());
+
+ // now 2s has passed, the first message should be redelivered 1s later.
+ Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg1);
+ }
+
private Producer<String> createProducer(String topic) throws
PulsarClientException {
return pulsarClient.newProducer(Schema.STRING)
.topic(topic)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c1e8df4d35c..50c5cccf855 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1486,7 +1486,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 &&
!msgMetadata.hasNumMessagesInBatch())) {
- // right now, chunked messages are only supported by non-shared
subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload,
msgMetadata, msgId, messageId, cnx);
if (uncompressedPayload == null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 273880569c3..c6983d047a2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -47,6 +47,7 @@ class NegativeAcksTracker implements Closeable {
// different timestamp, there will be multiple entries in the map
// RB Tree -> LongOpenHashMap -> Roaring64Bitmap
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
nackedMessages = null;
+ private final Long2ObjectMap<Long2ObjectMap<MessageId>> nackedMessageIds =
new Long2ObjectOpenHashMap<>();
private final ConsumerBase<?> consumer;
private final Timer timer;
@@ -89,7 +90,17 @@ class NegativeAcksTracker implements Closeable {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
- MessageId msgId = new MessageIdImpl(ledgerId, entryId,
DUMMY_PARTITION_INDEX);
+ MessageId msgId = null;
+ Long2ObjectMap<MessageId> entryMap =
nackedMessageIds.get(ledgerId);
+ if (entryMap != null) {
+ msgId = entryMap.remove(entryId);
+ if (entryMap.isEmpty()) {
+ nackedMessageIds.remove(ledgerId);
+ }
+ }
+ if (msgId == null) {
+ msgId = new MessageIdImpl(ledgerId, entryId,
DUMMY_PARTITION_INDEX);
+ }
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
@@ -143,6 +154,12 @@ class NegativeAcksTracker implements Closeable {
}
private synchronized void add(MessageId messageId, int redeliveryCount) {
+ if (messageId instanceof ChunkMessageIdImpl) {
+ MessageIdAdv msgId = (MessageIdAdv) messageId;
+ nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new
Long2ObjectOpenHashMap<>())
+ .put(msgId.getEntryId(), messageId);
+ }
+
if (nackedMessages == null) {
nackedMessages = new Long2ObjectAVLTreeMap<>();
}
@@ -201,5 +218,8 @@ class NegativeAcksTracker implements Closeable {
nackedMessages.clear();
nackedMessages = null;
}
+ if (nackedMessageIds != null) {
+ nackedMessageIds.clear();
+ }
}
}