This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe34738bcbd1c0e220b3911aa17979824387719f
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 11 16:53:32 2022 +0800

    [fix][flaky-test]TxnLogBufferedWriterTest.testMainProcess (#16999)
---
 .../coordinator/impl/TxnLogBufferedWriterTest.java | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 1220c92fd55..23fc04fda93 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -179,9 +180,9 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                     dataSerializer, batchedWriteMaxRecords, 
batchedWriteMaxSize,
                     batchedWriteMaxDelayInMillis, batchEnabled);
         // Store the param-context, param-position, param-exception of 
callback function and complete-count for verify.
-        ArrayList<Integer> contextArrayOfCallback = new ArrayList<>();
-        ArrayList<ManagedLedgerException> exceptionArrayOfCallback = new 
ArrayList<>();
-        LinkedHashMap<PositionImpl, ArrayList<Position>> positionsOfCallback = 
new LinkedHashMap<>();
+        List<Integer> contextArrayOfCallback = 
Collections.synchronizedList(new ArrayList<>());
+        List<ManagedLedgerException> exceptionArrayOfCallback = 
Collections.synchronizedList(new ArrayList<>());
+        Map<PositionImpl, List<Position>> positionsOfCallback = 
Collections.synchronizedMap(new LinkedHashMap<>());
         AtomicBoolean anyFlushCompleted = new AtomicBoolean();
         TxnLogBufferedWriter.AddDataCallback callback = new 
TxnLogBufferedWriter.AddDataCallback(){
             @Override
@@ -192,7 +193,8 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                 }
                 contextArrayOfCallback.add((int)ctx);
                 PositionImpl lightPosition = 
PositionImpl.get(position.getLedgerId(), position.getEntryId());
-                positionsOfCallback.computeIfAbsent(lightPosition, p -> new 
ArrayList<>());
+                positionsOfCallback.computeIfAbsent(lightPosition,
+                        p -> Collections.synchronizedList(new ArrayList<>()));
                 positionsOfCallback.get(lightPosition).add(position);
             }
             @Override
@@ -234,7 +236,8 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         Awaitility.await().atMost(maxWaitSeconds, TimeUnit.SECONDS)
                 .until(() -> contextArrayOfCallback.size() == 
writeCmdExecuteCount);
         // Assert callback param-context, verify that all callbacks are 
executed in strict order.
-        if (closeBufferedWriter){
+        // If exception occurs, the failure callback be executed earlier. So 
sorted contextArrayOfCallback.
+        if (closeBufferedWriter || bookieErrorType == 
BookieErrorType.SOMETIMES_ERROR){
             Collections.sort(contextArrayOfCallback);
         }
         Assert.assertEquals(contextArrayOfCallback.size(), 
writeCmdExecuteCount);
@@ -246,8 +249,6 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         int exceptionCallbackCount = exceptionArrayOfCallback.size();
         int positionCallbackCount = (int) 
positionsOfCallback.values().stream().flatMap(l -> l.stream()).count();
         if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType ||  
closeBufferedWriter){
-            Assert.assertTrue(exceptionCallbackCount > 0);
-            Assert.assertTrue(positionCallbackCount > 0);
             Assert.assertEquals(exceptionCallbackCount + 
positionCallbackCount, writeCmdExecuteCount);
         } else if (BookieErrorType.NO_ERROR == bookieErrorType){
             Assert.assertEquals(positionCallbackCount, writeCmdExecuteCount);
@@ -256,13 +257,13 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         }
         // if enabled batch-feature, will verify the attributes (batchSize, 
batchIndex) of callback param-position.
         if (exactlyBatched && BookieErrorType.ALWAYS_ERROR != bookieErrorType){
-            Iterator<ArrayList<Position>> callbackPositionIterator = 
positionsOfCallback.values().iterator();
+            Iterator<List<Position>> callbackPositionIterator = 
positionsOfCallback.values().iterator();
             List<String> exactlyFlushedDataArray = 
dataSerializer.getGeneratedJsonArray();
             for (int batchedEntryIndex = 0; batchedEntryIndex < 
exactlyFlushedDataArray.size() - exceptionCallbackCount;
                  batchedEntryIndex++) {
                 String json = exactlyFlushedDataArray.get(batchedEntryIndex);
                 List<Integer> batchedData = 
JsonDataSerializer.deserializeMergedData(json);
-                ArrayList<Position> innerPositions = 
callbackPositionIterator.next();
+                List<Position> innerPositions = 
callbackPositionIterator.next();
                 for (int i = 0; i < batchedData.size(); i++) {
                     TxnBatchedPositionImpl innerPosition =
                             (TxnBatchedPositionImpl) innerPositions.get(i);
@@ -368,7 +369,7 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                 1, TimeUnit.MILLISECONDS);
         SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
         // Cache the data flush to Bookie for Asserts.
-        List<Integer> dataArrayFlushedToBookie = new ArrayList<>();
+        List<Integer> dataArrayFlushedToBookie = 
Collections.synchronizedList(new ArrayList<>());
         Mockito.doAnswer(new Answer() {
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable 
{
@@ -531,10 +532,10 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
 
         private static ObjectMapper objectMapper = new ObjectMapper();
 
-        private ArrayList<ByteBuf> generatedByteBufArray = new ArrayList<>();
+        private List<ByteBuf> generatedByteBufArray = 
Collections.synchronizedList(new ArrayList<>());
 
         @Getter
-        private ArrayList<String> generatedJsonArray = new ArrayList<>();
+        private List<String> generatedJsonArray = 
Collections.synchronizedList(new ArrayList<>());
 
         private int eachDataBytesLen = 4;
 
@@ -590,8 +591,8 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
 
         protected void cleanup(){
             // Just for GC.
-            generatedByteBufArray = new ArrayList<>();
-            generatedJsonArray = new ArrayList<>();
+            generatedByteBufArray = Collections.synchronizedList(new 
ArrayList<>());
+            generatedJsonArray = Collections.synchronizedList(new 
ArrayList<>());
         }
 
         protected void assertAllByteBufHasBeenReleased(){

Reply via email to