This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f7a8c4cac9 [fix][test]fix flaky test 
TxnLogBufferedWriterTest.testMainProcess (#18290)
1f7a8c4cac9 is described below

commit 1f7a8c4cac91179ef8bb84a75fd618a40b940521
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 2 12:27:17 2022 +0800

    [fix][test]fix flaky test TxnLogBufferedWriterTest.testMainProcess (#18290)
---
 .../coordinator/impl/TxnLogBufferedWriterTest.java        | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 3da61a3cd80..a690d52266e 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -193,7 +194,7 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                     batchedWriteMaxDelayInMillis, batchEnabled, 
DISABLED_BUFFERED_WRITER_METRICS);
         // Store the param-context, param-position, param-exception of 
callback function and complete-count for verify.
         List<Integer> contextArrayOfCallback = 
Collections.synchronizedList(new ArrayList<>());
-        List<ManagedLedgerException> exceptionArrayOfCallback = 
Collections.synchronizedList(new ArrayList<>());
+        Map<Integer, ManagedLedgerException> exceptionArrayOfCallback = new 
ConcurrentHashMap<>();
         Map<PositionImpl, List<Position>> positionsOfCallback = 
Collections.synchronizedMap(new LinkedHashMap<>());
         AtomicBoolean anyFlushCompleted = new AtomicBoolean();
         TxnLogBufferedWriter.AddDataCallback callback = new 
TxnLogBufferedWriter.AddDataCallback(){
@@ -215,7 +216,7 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                     return;
                 }
                 contextArrayOfCallback.add((int)ctx);
-                exceptionArrayOfCallback.add(exception);
+                exceptionArrayOfCallback.put((int)ctx, exception);
             }
         };
         // Write many times.
@@ -252,8 +253,14 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
             Collections.sort(contextArrayOfCallback);
         }
         assertEquals(contextArrayOfCallback.size(), writeCmdExecuteCount);
-        for (int ctxIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
-            assertEquals(contextArrayOfCallback.get(ctxIndex).intValue(), 
ctxIndex);
+        for (int ctxIndex = 0, successIndex = 0; ctxIndex < 
writeCmdExecuteCount; ctxIndex++){
+            // When calling `txnLogBufferedWriter.close`, all tasks in the 
queue will fail immediately, this makes the
+            // callback of failure task earlier.
+            if (exceptionArrayOfCallback.containsKey(ctxIndex)){
+                continue;
+            }
+            assertEquals(contextArrayOfCallback.get(successIndex).intValue(), 
ctxIndex);
+            successIndex++;
         }
         // if {@param bookieError} is true. verify the ex count.
         // if {@param bookieError} is false. verify the param-position count.

Reply via email to