davidsealand-toast opened a new issue, #25220: URL: https://github.com/apache/pulsar/issues/25220
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Broker version - latest Broker Java version - 21 Client library type - Java Client library version - 4.0.8 (reproduced starting at 4.0.1) Client Java version - 21 ### Issue Description We observed in 4.0.5 that after negative acknowledging a chunked message, the consumer stopped. We expected it to consume the redelivered chunked message as well as any future incoming messages. This appears to be introduced when changing the `private HashMap<MessageId, Long> nackedMessages = null;` to a `private ConcurrentLongLongPairHashMap nackedMessages = null;` in this [PR](https://github.com/apache/pulsar/pull/23582/changes#diff-c6bdcfb1326d517455010ce62ef0b08b2f75d1c86773f6888abb165d6741454f). `nackedMessages` no longer supports a ChunkedMessageId as a key and is now just a LongLongPair of a single message. When the consumer attempts to find unacked messages to redeliver [here](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1527-L1532), it searches the map using a ChunkedMessageId which is not how the chunked message was stored in the map. ### Error messages ```text ``` ### Reproducing the issue Branch with the test below that reproduces the issue: [branch with test](https://github.com/davidsealand-toast/pulsar/blob/b8dca6be92a00e0eab43fdd0e801638d7dc67e06/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java#L601) ``` @Test public void testNegativeAckChunkedMessage() throws Exception { cleanup(); setup(); String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithChunks"); @Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("sub1") .acknowledgmentGroupTime(0, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) .subscribe(); @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(false) .enableChunking(true) .chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages .create(); String longMessage = "X".repeat(10 * 1024); producer.sendAsync(longMessage); producer.flush(); // negative ack the first message consumer.negativeAcknowledge(consumer.receive()); // now 2s has passed, the first message should be redelivered 1s later. Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS); assertNotNull(msg1); } ``` ### Additional information Two possible solutions could be: - modify the ConsumerImpl `unAckedChunkedMessageIdSequenceMap` to use the same format of key that NegativeAcksTracker passes - modify the NegativeAcksTracker `nackedMessages` to preserve the chunked message id sequence format Please advise on the recommended fix. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
