codelipenghui commented on code in PR #21406:
URL: https://github.com/apache/pulsar/pull/21406#discussion_r1382902778


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java:
##########
@@ -179,4 +187,94 @@ public void testCloseTransactionBufferWhenTimeout() throws 
Exception {
         Assert.assertTrue(f.isCompletedExceptionally());
     }
 
+    /**
+     * This test verifies the state changes of a TransactionBuffer within a 
topic under different conditions.
+     * Initially, the TransactionBuffer is in a NoSnapshot state upon topic 
creation.
+     * It remains in the NoSnapshot state even after a normal message is sent.
+     * The state changes to Ready only after a transactional message is sent.
+     * The test also ensures that the TransactionBuffer can be correctly 
recovered after the topic is unloaded.
+     */
+    @Test
+    public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + 
"/testWriteSnapshotWhenFirstTxnMessageSend";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+
+        TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        // The TransactionBuffer should be in NoSnapshot state when the topic 
is initially created
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);
+        });
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().send();
+        // The TransactionBuffer should still be in NoSnapshot state after a 
normal message is sent by the producer connected to the topic
+        Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        producer.newMessage(transaction).send();
+
+        MessageIdAdv messageId = (MessageIdAdv) 
producer.newMessage(transaction).send();
+        // Only after the producer sends a transactional message to this 
topic, the TransactionBuffer will be in Ready state
+        Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.Ready);
+        transaction.commit().get();
+        admin.topics().unload(topic);
+        PersistentTopic persistentTopic2 = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        TopicTransactionBuffer topicTransactionBuffer2 = 
(TopicTransactionBuffer) persistentTopic2
+                .getTransactionBuffer();
+        // After the topic is unloaded, the TransactionBuffer can be correctly 
recovered
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(topicTransactionBuffer2.getState(), 
TopicTransactionBufferState.State.Ready);
+            Assert.assertTrue(topicTransactionBuffer2.getMaxReadPosition()
+                    .compareTo(PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId())) > 0);
+        });
+    }
+
+    @Test
+    public void testMessagePublishInOrder() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + 
"/testMessagePublishInOrder";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+
+        TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        // Simulated recovery is not completed.
+        CompletableFuture<Void> transactionBufferFuture = new 
CompletableFuture<>();
+        Field publishFutureField = 
TopicTransactionBuffer.class.getDeclaredField("publishFuture");
+        publishFutureField.setAccessible(true);
+        publishFutureField.set(topicTransactionBuffer, 
transactionBufferFuture.thenApply(__ -> PositionImpl.EARLIEST));
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)

Review Comment:
   The test tested nothing if you don't have transaction messages. When I try 
to debug by using this test, the broker will not execute 
`TopicTransactionBuffer.appendBufferToTxn()` but the test gets passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to