Lanayx opened a new issue #7460:
URL: https://github.com/apache/pulsar/issues/7460


   **Describe the bug**
   In 2.6 client version the batch index acknowledge was added. But 2.5 cluster 
doesn't understand AckSet that was added for that and regards partially 
acknowleged batch as fully acknowledged. Therefore negatively acked messages 
will never be redelivered.
   
   **To Reproduce**
   Here is a sample program
   
   ```
   package jtest;
   
   import java.io.IOException;
   import java.time.Duration;
   import java.time.Instant;
   import java.util.ArrayList;
   import java.util.BitSet;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.Future;
   import java.util.concurrent.TimeUnit;
   import java.util.function.Function;
   import java.util.logging.LogManager;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.Reader;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.api.TypedMessageBuilder;
   import org.apache.pulsar.client.impl.TopicMessageIdImpl;
   
   /**
    * Hello world!
    */
   public final class App {
       private static Instant start;
   
       private App() {
       }
   
       /**
        * Says hello to the world.
        *
        * @param args The arguments of the program.
        * @throws IOException
        */
       public static void main(String[] args) throws IOException {
   
           try {
               PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://my-pulsar-cluster:30002").build();
               String topicName = "persistent://public/default/" + 
Instant.now().toEpochMilli();
               Consumer consumer = client.newConsumer()
                   .topic(topicName)
                   .subscriptionName("test-subscription")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .negativeAckRedeliveryDelay(200, TimeUnit.MILLISECONDS)
                   .subscribe();
               Producer<byte[]> producer = client.newProducer()
                       .topic(topicName)
                       .enableBatching(true)
                       .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                       .create();
   
               producer.sendAsync(("Hello from Java 1").getBytes());
               producer.sendAsync(("Hello from Java 2").getBytes());
   
               Message msg1 = consumer.receive();
               System.out.printf("MessageId received: %s\n", 
msg1.getMessageId().toString());
               Message msg2 = consumer.receive();
               System.out.printf("MessageId received: %s\n", 
msg2.getMessageId().toString());
               consumer.acknowledge(msg1);
               consumer.negativeAcknowledge(msg2);
   
               Message msg3 = consumer.receive();
               System.out.printf("MessageId received: %s\n", 
msg3.getMessageId().toString());
               Message msg4 = consumer.receive();
               System.out.printf("MessageId received: %s\n", 
msg4.getMessageId().toString());
                } catch (PulsarClientException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
       }
   }
   ```
   in 2.5 client msg3 and msg4 will be redelivered and consumed, in 2.6 client 
they won't be consumed again.
   
   **Expected behavior**
   in 2.6 client with 2.5 cluster the behavior should be the same as 2.5 client.
   
   **Additional context**
   This makes it a breaking change, so clients shouldn't upgrade to 2.6 until 
cluster is also upgraded or this issue is fixed
   
   cc @codelipenghui 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to