liangyepianzhou commented on code in PR #18273:
URL: https://github.com/apache/pulsar/pull/18273#discussion_r1095340496


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AbortTxnProcessorTest.java:
##########
@@ -0,0 +1,147 @@
+package org.apache.pulsar.broker.transaction;
+
+import java.lang.reflect.Field;
+import java.util.LinkedList;
+import java.util.NavigableMap;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.collections4.map.LinkedMap;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AbortTxnProcessorTest extends TransactionTestBase {
+
+    private static final String PROCESSOR_TOPIC = "persistent://" + NAMESPACE1 
+ "/abortedTxnProcessor";
+    private static final int SEGMENT_SIZE = 5;
+    private PulsarService pulsarService = null;
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        setUpBase(1, 1, PROCESSOR_TOPIC, 0);
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() + 5 * 3);
+    }
+
+    @Override
+    @AfterClass
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /**
+     * Test api:
+     *   1. putAbortedTxnAndPosition
+     *   2. checkAbortedTransaction
+     *   3. takeAbortedTxnsSnapshot
+     *   4. recoverFromSnapshot
+     *   5. trimExpiredAbortedTxns
+     * @throws Exception
+     */
+    @Test
+    public void testPutAbortedTxnIntoProcessor() throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarService.getBrokerService()
+                .getTopic(PROCESSOR_TOPIC, false).get().get();
+        AbortedTxnProcessor processor = new 
SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
+        //1. prepare test data.
+        //1.1 Put 10 aborted txn IDs to persistent two sealed segments.
+        for (int i = 0; i < 10; i++) {
+            TxnID txnID = new TxnID(0, i);
+            PositionImpl position = new PositionImpl(0,i);
+            processor.putAbortedTxnAndPosition(txnID, position);
+        }
+        //1.2 Put 4 aborted txn IDs into the unsealed segment.
+        for (int i = 10; i < 14; i++) {
+            TxnID txnID = new TxnID(0, i);
+            PositionImpl position = new PositionImpl(0,i);
+            processor.putAbortedTxnAndPosition(txnID, position);
+        }
+        //1.3 Verify the common data flow
+        verifyAbortedTxnIDAndSegmentIndex(processor,0,14);
+        //2. Take the latest snapshot and verify recover from snapshot
+        AbortedTxnProcessor newProcessor = new 
SnapshotSegmentAbortedTxnProcessorImpl(persistentTopic);
+        PositionImpl maxReadPosition = new PositionImpl(0, 14);
+        //2.1 Avoid update operation being canceled.
+        waitTaskExecuteCompletely(processor);
+        //2.2 take the latest snapshot
+        processor.takeAbortedTxnsSnapshot(maxReadPosition).get();
+        newProcessor.recoverFromSnapshot().get();
+        //Verify the recovery data flow
+        verifyAbortedTxnIDAndSegmentIndex(newProcessor,0,14);
+        //3. Delete the ledgers and then verify the date.
+        Field ledgersField = 
ManagedLedgerImpl.class.getDeclaredField("ledgers");
+        ledgersField.setAccessible(true);
+        NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers 
=
+                (NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo>)
+                        ledgersField.get(persistentTopic.getManagedLedger());
+        ledgers.forEach((k, v) -> {

Review Comment:
   And there is a test deleting a ledger and verifying the snapshot segment 
delete in `testPutAbortedTxnIntoProcessor`.
   
   ```java
      //3. Delete the ledgers and then verify the date.
           Field ledgersField = 
ManagedLedgerImpl.class.getDeclaredField("ledgers");
           ledgersField.setAccessible(true);
           NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgers =
                   (NavigableMap<Long, 
MLDataFormats.ManagedLedgerInfo.LedgerInfo>)
                           ledgersField.get(persistentTopic.getManagedLedger());
           ledgers.forEach((k, v) -> {
               ledgers.remove(k);
           });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to