shibd commented on PR #22610: URL: https://github.com/apache/pulsar/pull/22610#issuecomment-2081749388
hi, @tjiuming Thanks for your PR. I thinks we can change this test to cover all related transactions get lastmessage id case. https://github.com/apache/pulsar/blob/a761b97b733142b1ade525e1d1c06785e98face1/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java#L264 Change to: ```java @Test public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { // 1. Prepare environment String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions"; String subName = "my-subscription"; @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic(topic) .subscriptionName(subName) .subscribe(); // 2. Test last max read position can be required correctly. // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2| MessageIdImpl expectedLastMessageID = null; for (int i = 0; i < 3; i++) { expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); } assertMessageId(consumer, expectedLastMessageID); // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5. Transaction txn1 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() .get(); Transaction txn2 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() .get(); // |1:0|1:1|1:2|txn1:1:3| producer.newMessage(txn1).send(); // |1:0|1:1|1:2|txn1:1:3|1:4| MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5| producer.newMessage(txn2).send(); // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. assertMessageId(consumer, expectedLastMessageID); // 2.2.2 Last message ID will update to 1:4 when txn1 committed. // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6| txn1.commit().get(5, TimeUnit.SECONDS); assertMessageId(consumer, expectedLastMessageID1); // 2.2.3 Last message ID will still to 1:4 when txn2 aborted. // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7| txn2.abort().get(5, TimeUnit.SECONDS); assertMessageId(consumer, expectedLastMessageID1); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
