This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1f7a8c4cac9 [fix][test]fix flaky test
TxnLogBufferedWriterTest.testMainProcess (#18290)
1f7a8c4cac9 is described below
commit 1f7a8c4cac91179ef8bb84a75fd618a40b940521
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 2 12:27:17 2022 +0800
[fix][test]fix flaky test TxnLogBufferedWriterTest.testMainProcess (#18290)
---
.../coordinator/impl/TxnLogBufferedWriterTest.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 3da61a3cd80..a690d52266e 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -193,7 +194,7 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
batchedWriteMaxDelayInMillis, batchEnabled,
DISABLED_BUFFERED_WRITER_METRICS);
// Store the param-context, param-position, param-exception of
callback function and complete-count for verify.
List<Integer> contextArrayOfCallback =
Collections.synchronizedList(new ArrayList<>());
- List<ManagedLedgerException> exceptionArrayOfCallback =
Collections.synchronizedList(new ArrayList<>());
+ Map<Integer, ManagedLedgerException> exceptionArrayOfCallback = new
ConcurrentHashMap<>();
Map<PositionImpl, List<Position>> positionsOfCallback =
Collections.synchronizedMap(new LinkedHashMap<>());
AtomicBoolean anyFlushCompleted = new AtomicBoolean();
TxnLogBufferedWriter.AddDataCallback callback = new
TxnLogBufferedWriter.AddDataCallback(){
@@ -215,7 +216,7 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
return;
}
contextArrayOfCallback.add((int)ctx);
- exceptionArrayOfCallback.add(exception);
+ exceptionArrayOfCallback.put((int)ctx, exception);
}
};
// Write many times.
@@ -252,8 +253,14 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
Collections.sort(contextArrayOfCallback);
}
assertEquals(contextArrayOfCallback.size(), writeCmdExecuteCount);
- for (int ctxIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
- assertEquals(contextArrayOfCallback.get(ctxIndex).intValue(),
ctxIndex);
+ for (int ctxIndex = 0, successIndex = 0; ctxIndex <
writeCmdExecuteCount; ctxIndex++){
+ // When calling `txnLogBufferedWriter.close`, all tasks in the
queue will fail immediately, this makes the
+ // callback of failure task earlier.
+ if (exceptionArrayOfCallback.containsKey(ctxIndex)){
+ continue;
+ }
+ assertEquals(contextArrayOfCallback.get(successIndex).intValue(),
ctxIndex);
+ successIndex++;
}
// if {@param bookieError} is true. verify the ex count.
// if {@param bookieError} is false. verify the param-position count.