liangyepianzhou opened a new issue, #15832:
URL: https://github.com/apache/pulsar/issues/15832

   **Describe the bug**
   If the ack mode is not key-shared or shared, batch size defaults to 1. And 
this is will make transaction pending ack can not check 
   conflict.
   ```java
       private long getBatchSize(MessageIdData msgId) {
           long batchSize = 1;
           if (Subscription.isIndividualAckMode(subType)) {
               LongPair longPair = pendingAcks.get(msgId.getLedgerId(), 
msgId.getEntryId());
               // Consumer may ack the msg that not belongs to it.
               if (longPair == null) {
                   Consumer ackOwnerConsumer = 
getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
                   longPair = 
ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
                   if (longPair != null) {
                       batchSize = longPair.first;
                   }
               } else {
                   batchSize = longPair.first;
               }
           }
           return batchSize;
       }
   ```
   
   **To Reproduce**
   1. produce 3 batch messages 
   2. receive message1
   3. receive message2
   4. ack message2 with the transaction1.
   5. ack message2 with the transaction2.
   6. commit transaction1
   7. commit transaction2
   ```java
       @Test
       public void testGetPositionStatsInPendingAckStatsFroBatch() throws 
Exception {
           String topic = "persistent://public/default/test";
           String subscriptionName = "my-subscription-batch";
           initTransaction(1);
           pulsar.getBrokerService()
                   .getManagedLedgerConfig(TopicName.get(topic)).get()
                   .setDeletionAtBatchIndexLevelEnabled(true);
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .enableBatching(true)
                   .batchingMaxMessages(3)
                   // set batch max publish delay big enough to make sure entry 
has 3 messages
                   .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
                   .topic(topic).create();
   
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .subscriptionName(subscriptionName)
                   .enableBatchIndexAcknowledgment(true)
                   .subscriptionType(SubscriptionType.Exclusive)
                   .isAckReceiptEnabled(true)
                   .topic(topic)
                   .subscribe();
   
           List<MessageId> messageIds = new ArrayList<>();
           List<CompletableFuture<MessageId>> futureMessageIds = new 
ArrayList<>();
   
           List<String> messages = new ArrayList<>();
           for (int i = 0; i < 3; i++) {
               String message = "my-message-" + i;
               messages.add(message);
               CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(message);
               futureMessageIds.add(messageIdCompletableFuture);
           }
   
           for (CompletableFuture<MessageId> futureMessageId : 
futureMessageIds) {
               MessageId messageId = futureMessageId.get();
               messageIds.add(messageId);
           }
   
           Transaction transaction = pulsarClient.newTransaction()
                   .withTransactionTimeout(5, TimeUnit.DAYS)
                   .build()
                   .get();
   
           Message<String> message1 = consumer.receive();
           Message<String> message2 = consumer.receive();
   
           BatchMessageIdImpl messageId = (BatchMessageIdImpl) 
message2.getMessageId();
           consumer.acknowledgeAsync(messageId, transaction).get();
   
           Transaction transaction2 = pulsarClient.newTransaction()
                   .withTransactionTimeout(5, TimeUnit.DAYS)
                   .build()
                   .get();
   
           consumer.acknowledgeAsync(messageId, transaction2).get();
           transaction.commit().get();
           transaction2.commit().get();
   ```
   **Expected behavior**
   Expected a conflict exception.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to