davidsealand-toast opened a new issue, #25220:
URL: https://github.com/apache/pulsar/issues/25220

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   Broker version - latest
   Broker Java version - 21
   Client library type - Java
   Client library version - 4.0.8 (reproduced starting at 4.0.1)
   Client Java version - 21
   
   ### Issue Description
   
   We observed in 4.0.5 that after negative acknowledging a chunked message, 
the consumer stopped. We expected it to consume the redelivered chunked message 
as well as any future incoming messages.
   
   This appears to be introduced when changing the `private HashMap<MessageId, 
Long> nackedMessages = null;` to a `private ConcurrentLongLongPairHashMap 
nackedMessages = null;` in this 
[PR](https://github.com/apache/pulsar/pull/23582/changes#diff-c6bdcfb1326d517455010ce62ef0b08b2f75d1c86773f6888abb165d6741454f).
 `nackedMessages` no longer supports a ChunkedMessageId as a key and is now 
just a LongLongPair of a single message.
   
   When the consumer attempts to find unacked messages to redeliver 
[here](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1527-L1532),
 it searches the map using a ChunkedMessageId which is not how the chunked 
message was stored in the map.
   
   ### Error messages
   
   ```text
   
   ```
   
   ### Reproducing the issue
   
   Branch with the test below that reproduces the issue: [branch with 
test](https://github.com/davidsealand-toast/pulsar/blob/b8dca6be92a00e0eab43fdd0e801638d7dc67e06/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java#L601)
   
   ```
   @Test
       public void testNegativeAckChunkedMessage() throws Exception {
           cleanup();
           setup();
           String topic = 
BrokerTestUtil.newUniqueName("testNegativeAcksWithChunks");
   
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("sub1")
                   .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
                   .subscriptionType(SubscriptionType.Shared)
                   .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .enableBatching(false)
                   .enableChunking(true)
                   .chunkMaxMessageSize(1024) // 1KB max - forces chunking for 
larger messages
                   .create();
           String longMessage = "X".repeat(10 * 1024);
           producer.sendAsync(longMessage);
           producer.flush();
   
           // negative ack the first message
           consumer.negativeAcknowledge(consumer.receive());
   
           // now 2s has passed, the first message should be redelivered 1s 
later.
           Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
           assertNotNull(msg1);
       }
   ```
   
   ### Additional information
   
   Two possible solutions could be:
   - modify the ConsumerImpl `unAckedChunkedMessageIdSequenceMap` to use the 
same format of key that NegativeAcksTracker passes
   - modify the NegativeAcksTracker `nackedMessages` to preserve the chunked 
message id sequence format
   
   Please advise on the recommended fix.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to