This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f42cd251f6c [fix][client] Send all chunkMessageIds to broker for 
redelivery (#25229)
f42cd251f6c is described below

commit f42cd251f6c0bb0968f909768282b51b18d77c81
Author: cai minjian <[email protected]>
AuthorDate: Mon Feb 9 18:24:40 2026 +0800

    [fix][client] Send all chunkMessageIds to broker for redelivery (#25229)
---
 .../client/impl/MessageChunkingSharedTest.java     | 33 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 -
 .../pulsar/client/impl/NegativeAcksTracker.java    |  2 +-
 3 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 3d24d3746d6..203715ca7db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -193,6 +193,39 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
         assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", 
"A-1"));
     }
 
+    // Issue #25220
+    @Test
+    public void testNegativeAckChunkedMessage() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .enableChunking(true)
+                .chunkMaxMessageSize(1024) // 1KB max - forces chunking for 
larger messages
+                .create();
+        String longMessage = "X".repeat(10 * 1024);
+        producer.sendAsync(longMessage);
+        producer.flush();
+
+        // negative ack the first message
+        consumer.negativeAcknowledge(consumer.receive());
+
+        // now 2s has passed, the first message should be redelivered 1s later.
+        Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+        assertNotNull(msg1);
+    }
+
     private Producer<String> createProducer(String topic) throws 
PulsarClientException {
         return pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index adecd97564f..7091b05151e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1508,7 +1508,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // and return undecrypted payload
         if (isMessageUndecryptable || (numMessages == 1 && 
!msgMetadata.hasNumMessagesInBatch())) {
 
-            // right now, chunked messages are only supported by non-shared 
subscription
             if (isChunkedMessage) {
                 uncompressedPayload = processMessageChunk(uncompressedPayload, 
msgMetadata, msgId, messageId, cnx);
                 if (uncompressedPayload == null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index e0ec16f507e..d975d22be7d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -158,7 +158,7 @@ class NegativeAcksTracker implements Closeable {
     private synchronized void add(MessageId messageId, int redeliveryCount) {
         if (messageId instanceof TraceableMessageId) {
             Span span = ((TraceableMessageId) messageId).getTracingSpan();
-            if (span != null) {
+            if (span != null || messageId instanceof ChunkMessageIdImpl) {
                 MessageIdAdv msgId = (MessageIdAdv) messageId;
                 nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new 
Long2ObjectOpenHashMap<>())
                         .put(msgId.getEntryId(), messageId);

Reply via email to