This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fe34738bcbd1c0e220b3911aa17979824387719f Author: fengyubiao <[email protected]> AuthorDate: Thu Aug 11 16:53:32 2022 +0800 [fix][flaky-test]TxnLogBufferedWriterTest.testMainProcess (#16999) --- .../coordinator/impl/TxnLogBufferedWriterTest.java | 29 +++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index 1220c92fd55..23fc04fda93 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -179,9 +180,9 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, batchedWriteMaxDelayInMillis, batchEnabled); // Store the param-context, param-position, param-exception of callback function and complete-count for verify. - ArrayList<Integer> contextArrayOfCallback = new ArrayList<>(); - ArrayList<ManagedLedgerException> exceptionArrayOfCallback = new ArrayList<>(); - LinkedHashMap<PositionImpl, ArrayList<Position>> positionsOfCallback = new LinkedHashMap<>(); + List<Integer> contextArrayOfCallback = Collections.synchronizedList(new ArrayList<>()); + List<ManagedLedgerException> exceptionArrayOfCallback = Collections.synchronizedList(new ArrayList<>()); + Map<PositionImpl, List<Position>> positionsOfCallback = Collections.synchronizedMap(new LinkedHashMap<>()); AtomicBoolean anyFlushCompleted = new AtomicBoolean(); TxnLogBufferedWriter.AddDataCallback callback = new TxnLogBufferedWriter.AddDataCallback(){ @Override @@ -192,7 +193,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { } contextArrayOfCallback.add((int)ctx); PositionImpl lightPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId()); - positionsOfCallback.computeIfAbsent(lightPosition, p -> new ArrayList<>()); + positionsOfCallback.computeIfAbsent(lightPosition, + p -> Collections.synchronizedList(new ArrayList<>())); positionsOfCallback.get(lightPosition).add(position); } @Override @@ -234,7 +236,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { Awaitility.await().atMost(maxWaitSeconds, TimeUnit.SECONDS) .until(() -> contextArrayOfCallback.size() == writeCmdExecuteCount); // Assert callback param-context, verify that all callbacks are executed in strict order. - if (closeBufferedWriter){ + // If exception occurs, the failure callback be executed earlier. So sorted contextArrayOfCallback. + if (closeBufferedWriter || bookieErrorType == BookieErrorType.SOMETIMES_ERROR){ Collections.sort(contextArrayOfCallback); } Assert.assertEquals(contextArrayOfCallback.size(), writeCmdExecuteCount); @@ -246,8 +249,6 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { int exceptionCallbackCount = exceptionArrayOfCallback.size(); int positionCallbackCount = (int) positionsOfCallback.values().stream().flatMap(l -> l.stream()).count(); if (BookieErrorType.SOMETIMES_ERROR == bookieErrorType || closeBufferedWriter){ - Assert.assertTrue(exceptionCallbackCount > 0); - Assert.assertTrue(positionCallbackCount > 0); Assert.assertEquals(exceptionCallbackCount + positionCallbackCount, writeCmdExecuteCount); } else if (BookieErrorType.NO_ERROR == bookieErrorType){ Assert.assertEquals(positionCallbackCount, writeCmdExecuteCount); @@ -256,13 +257,13 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { } // if enabled batch-feature, will verify the attributes (batchSize, batchIndex) of callback param-position. if (exactlyBatched && BookieErrorType.ALWAYS_ERROR != bookieErrorType){ - Iterator<ArrayList<Position>> callbackPositionIterator = positionsOfCallback.values().iterator(); + Iterator<List<Position>> callbackPositionIterator = positionsOfCallback.values().iterator(); List<String> exactlyFlushedDataArray = dataSerializer.getGeneratedJsonArray(); for (int batchedEntryIndex = 0; batchedEntryIndex < exactlyFlushedDataArray.size() - exceptionCallbackCount; batchedEntryIndex++) { String json = exactlyFlushedDataArray.get(batchedEntryIndex); List<Integer> batchedData = JsonDataSerializer.deserializeMergedData(json); - ArrayList<Position> innerPositions = callbackPositionIterator.next(); + List<Position> innerPositions = callbackPositionIterator.next(); for (int i = 0; i < batchedData.size(); i++) { TxnBatchedPositionImpl innerPosition = (TxnBatchedPositionImpl) innerPositions.get(i); @@ -368,7 +369,7 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { 1, TimeUnit.MILLISECONDS); SumStrDataSerializer dataSerializer = new SumStrDataSerializer(); // Cache the data flush to Bookie for Asserts. - List<Integer> dataArrayFlushedToBookie = new ArrayList<>(); + List<Integer> dataArrayFlushedToBookie = Collections.synchronizedList(new ArrayList<>()); Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { @@ -531,10 +532,10 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { private static ObjectMapper objectMapper = new ObjectMapper(); - private ArrayList<ByteBuf> generatedByteBufArray = new ArrayList<>(); + private List<ByteBuf> generatedByteBufArray = Collections.synchronizedList(new ArrayList<>()); @Getter - private ArrayList<String> generatedJsonArray = new ArrayList<>(); + private List<String> generatedJsonArray = Collections.synchronizedList(new ArrayList<>()); private int eachDataBytesLen = 4; @@ -590,8 +591,8 @@ public class TxnLogBufferedWriterTest extends MockedBookKeeperTestCase { protected void cleanup(){ // Just for GC. - generatedByteBufArray = new ArrayList<>(); - generatedJsonArray = new ArrayList<>(); + generatedByteBufArray = Collections.synchronizedList(new ArrayList<>()); + generatedJsonArray = Collections.synchronizedList(new ArrayList<>()); } protected void assertAllByteBufHasBeenReleased(){
