This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bbceab4d0bac45f197c06a39eac00019d5bdc4a6 Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 9 13:17:31 2026 +0200 [improve][test][branch-3.0] Add test for issue #25220 --- .../client/impl/MessageChunkingSharedTest.java | 33 ++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java index 3d24d3746d6..203715ca7db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java @@ -193,6 +193,39 @@ public class MessageChunkingSharedTest extends ProducerConsumerBase { assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", "A-1")); } + // Issue #25220 + @Test + public void testNegativeAckChunkedMessage() throws Exception { + final String topic = "persistent://my-property/my-ns/test-negative-acknowledge-with-chunk"; + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .enableChunking(true) + .chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages + .create(); + String longMessage = "X".repeat(10 * 1024); + producer.sendAsync(longMessage); + producer.flush(); + + // negative ack the first message + consumer.negativeAcknowledge(consumer.receive()); + + // now 2s has passed, the first message should be redelivered 1s later. + Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + } + private Producer<String> createProducer(String topic) throws PulsarClientException { return pulsarClient.newProducer(Schema.STRING) .topic(topic)
