congbobo184 commented on a change in pull request #9229:
URL: https://github.com/apache/pulsar/pull/9229#discussion_r560702974



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -123,4 +130,92 @@ public void testAddAckedPartitionToTxn() throws 
ExecutionException, InterruptedE
         
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
         
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
     }
+
+    @Test
+    public void testTimeoutTracker() throws Exception {
+        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        Method method = 
TransactionMetadataStoreState.class.getDeclaredMethod("checkIfReady");
+        method.setAccessible(true);
+        Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+                .until(() -> (Boolean) 
method.invoke(transactionMetadataStore));
+        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
+        for (int i = 0; i < 1000; i ++) {
+            transactionMetadataStore.newTransaction(5).get();
+        }
+
+        txnMap.forEach((txnID, txnMetaListPair) -> {
+            Assert.assertEquals(txnMetaListPair.getLeft().status(), 
TxnStatus.OPEN);
+        });
+        Awaitility.await().atLeast(5000, TimeUnit.MICROSECONDS).atMost(10000, 
TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+    }
+
+    @Test
+    public void testTimeoutTrackerMultiThreading() throws Exception {
+        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        Method method = 
TransactionMetadataStoreState.class.getDeclaredMethod("checkIfReady");
+        method.setAccessible(true);
+        Awaitility.await().atMost(1000, TimeUnit.MILLISECONDS)
+                .until(() -> (Boolean) 
method.invoke(transactionMetadataStore));
+        Field field = 
MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) 
field.get(transactionMetadataStore);
+        new Thread(() -> {
+            for (int i = 0; i < 100; i ++) {
+                try {
+                    transactionMetadataStore.newTransaction(1);
+                } catch (Exception e) {
+                    //no operation
+                }
+            }
+        }).start();
+
+        new Thread(() -> {
+            for (int i = 0; i < 100; i ++) {
+                try {
+                    transactionMetadataStore.newTransaction(3);
+                } catch (Exception e) {
+                    //no operation
+                }
+            }
+        }).start();
+
+        new Thread(() -> {
+            for (int i = 0; i < 100; i ++) {
+                try {
+                    transactionMetadataStore.newTransaction(2);
+                } catch (Exception e) {
+                    //no operation
+                }
+            }
+        }).start();
+
+        new Thread(() -> {
+            for (int i = 0; i < 100; i ++) {
+                try {
+                    transactionMetadataStore.newTransaction(10);
+                } catch (Exception e) {
+                    //no operation
+                }
+            }
+        }).start();

Review comment:
       in order to new 100 transaction, we also change the for to while.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to