shibd commented on PR #22610:
URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081749388

   hi, @tjiuming Thanks for your PR.
   
   I thinks we can change this test to cover all related transactions get 
lastmessage id case.
   
   
https://github.com/apache/pulsar/blob/a761b97b733142b1ade525e1d1c06785e98face1/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java#L264
   
   Change to:
   
   ```java
   @Test
       public void testGetLastMessageIdsWithOngoingTransactions() throws 
Exception {
           // 1. Prepare environment
           String topic = "persistent://" + NAMESPACE1 + 
"/testGetLastMessageIdsWithOngoingTransactions";
           String subName = "my-subscription";
           @Cleanup
           Producer<byte[]> producer = pulsarClient.newProducer()
                   .topic(topic)
                   .create();
           Consumer<byte[]> consumer = pulsarClient.newConsumer()
                   .topic(topic)
                   .subscriptionName(subName)
                   .subscribe();
   
           // 2. Test last max read position can be required correctly.
           // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
           MessageIdImpl expectedLastMessageID = null;
           for (int i = 0; i < 3; i++) {
               expectedLastMessageID = (MessageIdImpl) 
producer.newMessage().send();
           }
           assertMessageId(consumer, expectedLastMessageID);
           // 2.2 Case2: send 2 ongoing transactional messages and 2 original 
messages.
           // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
           Transaction txn1 = pulsarClient.newTransaction()
                   .withTransactionTimeout(5, TimeUnit.HOURS)
                   .build()
                   .get();
           Transaction txn2 = pulsarClient.newTransaction()
                   .withTransactionTimeout(5, TimeUnit.HOURS)
                   .build()
                   .get();
   
           // |1:0|1:1|1:2|txn1:1:3|
           producer.newMessage(txn1).send();
           
           // |1:0|1:1|1:2|txn1:1:3|1:4|
           MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) 
producer.newMessage().send();
   
           // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
           producer.newMessage(txn2).send();
           
           // 2.2.1 Last message ID will not change when txn1 and txn2 do not 
end.
           assertMessageId(consumer, expectedLastMessageID);
   
           // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
           // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
           txn1.commit().get(5, TimeUnit.SECONDS);
           assertMessageId(consumer, expectedLastMessageID1);
   
           // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
           // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
           txn2.abort().get(5, TimeUnit.SECONDS);
           assertMessageId(consumer, expectedLastMessageID1);
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to