liangyepianzhou commented on code in PR #21406:
URL: https://github.com/apache/pulsar/pull/21406#discussion_r1712508690


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java:
##########
@@ -405,4 +415,185 @@ private void assertGetLastMessageId(Consumer<?> consumer, 
MessageIdImpl expected
         assertEquals(expected.getLedgerId(), actual.getLedgerId());
     }
 
+    /**
+     * This test verifies the state changes of a TransactionBuffer within a 
topic under different conditions.
+     * Initially, the TransactionBuffer is in a NoSnapshot state upon topic 
creation.
+     * It remains in the NoSnapshot state even after a normal message is sent.
+     * The state changes to Ready only after a transactional message is sent.
+     * The test also ensures that the TransactionBuffer can be correctly 
recovered after the topic is unloaded.
+     */
+    @Test
+    public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception {
+        // 1. Prepare test environment.
+        String topic = "persistent://" + NAMESPACE1 + 
"/testWriteSnapshotWhenFirstTxnMessageSend";
+        String txnMsg = "transaction message";
+        String normalMsg = "normal message";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscribe();
+        // 2. Test the state of transaction buffer after building producer 
with no new messages.
+        // The TransactionBuffer should be in NoSnapshot state before 
transaction message sent.
+        TopicTransactionBuffer topicTransactionBuffer = 
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);
+        });
+        // 3. Test the state of transaction buffer after sending normal 
messages.
+        // The TransactionBuffer should still be in NoSnapshot state after a 
normal message is sent.
+        producer.newMessage().value(normalMsg).send();
+        Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.NoSnapshot);
+        // 4. Test the state of transaction buffer after sending transaction 
messages.
+        // The transaction buffer should be in Ready state at this time.
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build()
+                .get();
+        producer.newMessage(transaction).value(txnMsg).send();
+        Assert.assertEquals(topicTransactionBuffer.getState(), 
TopicTransactionBufferState.State.Ready);
+        // 5. Test transaction buffer can be recovered correctly.
+        // There are 4 message sent to this topic, 2 normal message and 2 
transaction message |m1|m2-txn1|m3-txn1|m4|.
+        // Aborting the transaction and unload the topic and then redelivering 
unacked messages,
+        // only normal messages can be received.
+        transaction.abort().get(5, TimeUnit.SECONDS);
+        producer.newMessage().value(normalMsg).send();
+        admin.topics().unload(topic);
+        PersistentTopic persistentTopic2 = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        TopicTransactionBuffer topicTransactionBuffer2 = 
(TopicTransactionBuffer) persistentTopic2
+                .getTransactionBuffer();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(topicTransactionBuffer2.getState(), 
TopicTransactionBufferState.State.Ready);
+        });
+        consumer.redeliverUnacknowledgedMessages();
+        for (int i = 0; i < 2; i++) {
+            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+            Assert.assertEquals(message.getValue(), normalMsg);
+        }
+        Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+    }
+
+    @RequiredArgsConstructor
+    static class MockTransactionBufferProvider implements 
TransactionBufferProvider {
+
+        private final Function<PersistentTopic, TransactionBuffer> function;
+
+        @Override
+        public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+            return function.apply((PersistentTopic) originTopic);
+        }
+    }
+
+    /**
+     * Send some messages before transaction buffer ready and then send some 
messages after transaction buffer ready,
+     * these messages should be received in order.
+     */
+    @Test
+    public void testMessagePublishInOrder() throws Exception {
+        // 1. Prepare test environment.
+        final var transactionBufferFuture = new 
AtomicReference<CompletableFuture<Void>>();
+        transactionBufferFuture.set(new CompletableFuture<>());
+        pulsarServiceList.get(0).setTransactionExecutorProvider(new 
MockTransactionBufferProvider(topic -> {
+            final var transactionBuffer = spy(new 
TopicTransactionBuffer(topic));
+            
when(transactionBuffer.getTransactionBufferFuture()).thenReturn(transactionBufferFuture.get());
+            
when(transactionBuffer.getState()).thenReturn(TopicTransactionBufferState.State.Ready);
+            return transactionBuffer;
+        }));
+
+        String topic = "persistent://" + NAMESPACE1 + 
"/testMessagePublishInOrder" + RandomUtils.nextLong();
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .create();
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.HOURS)
+                .build().get();
+
+        // 2. Set a new future in transaction buffer as 
`transactionBufferFuture` to simulate whether the
+        // transaction buffer recover completely.
+        // Register this topic to the transaction in advance to avoid the 
sending request pending here.
+        ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, 
TimeUnit.SECONDS);
+        // 3. Test the messages sent before transaction buffer ready is in 
order.
+        for (int i = 0; i < 50; i++) {
+            producer.newMessage(transaction).value(i).sendAsync();
+        }
+        // 4. Test the messages sent after transaction buffer ready is in 
order.
+        transactionBufferFuture.get().complete(null);
+        for (int i = 50; i < 100; i++) {
+            producer.newMessage(transaction).value(i).sendAsync();
+        }
+        transaction.commit().get();
+        for (int i = 0; i < 100; i++) {
+            Message<Integer> message = consumer.receive(5, TimeUnit.SECONDS);
+            Assert.assertEquals(message.getValue(), i);
+        }
+    }
+
+    /**
+     * Test `testMessagePublishInOrder` will test the ref count work as 
expected with no exception.
+     * And this test is used to test the memory leak due to ref count.
+     */
+    @Test
+    public void testRefCountWhenAppendBufferToTxn() throws Exception {
+        // 1. Prepare test resource
+        this.pulsarServiceList.forEach(pulsarService ->  {
+            pulsarService.setTransactionExecutorProvider(new 
TransactionBufferTestProvider());
+        });
+        String topic = "persistent://" + NAMESPACE1 + 
"/testRefCountWhenAppendBufferToTxn";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        TransactionBufferTestImpl topicTransactionBuffer = 
(TransactionBufferTestImpl) persistentTopic
+                .getTransactionBuffer();

Review Comment:
   After discussing with @BewareMyPower, `Mockito.spy` now has a bug. Use 
subclasses uniformly for testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to