liangyepianzhou commented on code in PR #18273:
URL: https://github.com/apache/pulsar/pull/18273#discussion_r1095362131


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -818,9 +820,98 @@ public void 
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
         //verify snapshot
         assertEquals(snapshot.getTopicName(), snapshotTopic);
         assertEquals(snapshot.getSequenceId(), 2L);
-        assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
-        assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
+        assertEquals(snapshot.getPersistentPositionLedgerId(), 2L);
+        assertEquals(snapshot.getPersistentPositionEntryId(), 3L);
         assertEquals(snapshot.getAborts().toArray()[0], new TxnIDData(1, 1));
     }
 
+    //Verify the snapshotSegmentProcessor end to end
+    @Test
+    public void testSnapshotSegment() throws Exception {
+        String topic = NAMESPACE1 + "/testSnapshotSegment";
+        String subName = "testSnapshotSegment";
+
+        LinkedMap<Transaction, MessageId> ongoingTxns = new LinkedMap<>();
+        LinkedList<MessageId> abortedTxns = new LinkedList<>();
+        // 0. Modify the configurations, enabling the segment snapshot and set 
the size of the snapshot segment.
+        int theSizeOfSegment = 10;
+        int theCountOfSnapshotMaxTxnCount = 3;
+        
this.getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
this.getPulsarServiceList().get(0).getConfig().setTransactionBufferSnapshotSegmentSize(theSizeOfSegment);
+        this.getPulsarServiceList().get(0).getConfig()
+                
.setTransactionBufferSnapshotMaxTransactionCount(theCountOfSnapshotMaxTxnCount);
+        // 1. Build producer and consumer
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscribe();
+
+        // 2. Check the AbortedTxnProcessor workflow 10 times
+        int messageSize = theSizeOfSegment * 4;
+        for (int i = 0; i < 10; i++) {
+            MessageId maxReadMessage = null;
+            int abortedTxnSize = 0;
+            for (int j = 0; j < messageSize; j++) {
+                Transaction transaction = pulsarClient.newTransaction()
+                        .withTransactionTimeout(5, 
TimeUnit.MINUTES).build().get();
+                //Half common message and half transaction message.
+                if (j % 2 == 0) {
+                    MessageId messageId = 
producer.newMessage(transaction).value(i * 10 + j).send();
+                    //And the transaction message have a half which are 
aborted.
+                    if (RandomUtils.nextInt() % 2 == 0) {
+                        transaction.abort().get();
+                        abortedTxns.add(messageId);
+                        abortedTxnSize++;
+                    } else {
+                        ongoingTxns.put(transaction, messageId);
+                        if (maxReadMessage == null) {
+                            //The except number of the messages that can be 
read
+                            maxReadMessage = messageId;
+                        }
+                    }
+                } else {
+                    producer.newMessage().value(i * 10 + j).send();
+                    transaction.commit().get();
+                }
+            }
+            // 2.1 Receive all message before the maxReadPosition to verify 
the correctness of the max read position.
+            int hasReceived = 0;
+            while (true) {
+                Message<Integer> message = consumer.receive(2, 
TimeUnit.SECONDS);
+                if (message != null) {
+                    
Assert.assertTrue(message.getMessageId().compareTo(maxReadMessage) < 0);
+                    hasReceived ++;
+                } else {
+                    break;
+                }
+            }
+            //2.2 Commit all ongoing transaction and verify that the consumer 
can receive all rest message
+            // expect for aborted txn message.
+            for (Transaction ongoingTxn: ongoingTxns.keySet()) {
+                ongoingTxn.commit().get();
+            }
+            ongoingTxns.clear();
+            for (int k = hasReceived; k < messageSize - abortedTxnSize; k++) {
+                Message<Integer> message = consumer.receive(2, 
TimeUnit.SECONDS);
+                assertNotNull(message);
+                assertFalse(abortedTxns.contains(message.getMessageId()));
+            }
+        }
+        // 3. After the topic unload, the consumer can receive all the 
messages in the 10 tests
+        // expect for the aborted transaction messages.

Review Comment:
   > before unload, check the aborted txn, segment, index stats from the 
processor
   The processor's internal stat verity can be found in 
`SegmentAbortedTxnProcessorTest`.
   This test will verify the processor can run properly end to end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to