This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f42cd251f6c [fix][client] Send all chunkMessageIds to broker for
redelivery (#25229)
f42cd251f6c is described below
commit f42cd251f6c0bb0968f909768282b51b18d77c81
Author: cai minjian <[email protected]>
AuthorDate: Mon Feb 9 18:24:40 2026 +0800
[fix][client] Send all chunkMessageIds to broker for redelivery (#25229)
---
.../client/impl/MessageChunkingSharedTest.java | 33 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 -
.../pulsar/client/impl/NegativeAcksTracker.java | 2 +-
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 3d24d3746d6..203715ca7db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -193,6 +193,39 @@ public class MessageChunkingSharedTest extends
ProducerConsumerBase {
assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1",
"A-1"));
}
+ // Issue #25220
+ @Test
+ public void testNegativeAckChunkedMessage() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .enableChunking(true)
+ .chunkMaxMessageSize(1024) // 1KB max - forces chunking for
larger messages
+ .create();
+ String longMessage = "X".repeat(10 * 1024);
+ producer.sendAsync(longMessage);
+ producer.flush();
+
+ // negative ack the first message
+ consumer.negativeAcknowledge(consumer.receive());
+
+ // now 2s has passed, the first message should be redelivered 1s later.
+ Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
+ assertNotNull(msg1);
+ }
+
private Producer<String> createProducer(String topic) throws
PulsarClientException {
return pulsarClient.newProducer(Schema.STRING)
.topic(topic)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index adecd97564f..7091b05151e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1508,7 +1508,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 &&
!msgMetadata.hasNumMessagesInBatch())) {
- // right now, chunked messages are only supported by non-shared
subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload,
msgMetadata, msgId, messageId, cnx);
if (uncompressedPayload == null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index e0ec16f507e..d975d22be7d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -158,7 +158,7 @@ class NegativeAcksTracker implements Closeable {
private synchronized void add(MessageId messageId, int redeliveryCount) {
if (messageId instanceof TraceableMessageId) {
Span span = ((TraceableMessageId) messageId).getTracingSpan();
- if (span != null) {
+ if (span != null || messageId instanceof ChunkMessageIdImpl) {
MessageIdAdv msgId = (MessageIdAdv) messageId;
nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new
Long2ObjectOpenHashMap<>())
.put(msgId.getEntryId(), messageId);