liangyepianzhou commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927240282
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -382,37 +398,49 @@ public Object answer(InvocationOnMock invocation) throws
Throwable {
return null;
}
}).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
- // Test threshold: writeMaxDelayInMillis.
- TxnLogBufferedWriter txnLogBufferedWriter1 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
- scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100,
true);
TxnLogBufferedWriter.AddDataCallback callback =
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
- txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+
+ // Test threshold: writeMaxDelayInMillis (use scheduled service).
+ TxnLogBufferedWriter txnLogBufferedWriter0 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100,
true);
+
+ txnLogBufferedWriter0.asyncAddData(100, callback, 100);
Thread.sleep(90);
// Verify does not refresh ahead of time.
Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
dataArrayFlushedToBookie.size() == 1);
Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+ txnLogBufferedWriter0.close();
+
+ // Test threshold: writeMaxDelayInMillis (use timer).
+ TxnLogBufferedWriter txnLogBufferedWriter1 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ transactionTimer, dataSerializer, 32, 1024 * 4, 100, true);
+ txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+ Thread.sleep(70);
Review Comment:
Why is this not equal to writeMaxDelayInMillis?
And if we can not use the Thread.sleep().
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords,
int batchedWriteMaxSize,
}
OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(5).name("tx-threads").build();
- ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
JsonDataSerializer dataSerializer = new
JsonDataSerializer(eachDataBytesLen);
/**
* Execute test task.
* 1. Write many times.
* 2. Store the param-context and param-position of callback
function for verify.
*/
// Create TxLogBufferedWriter.
- TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
- managedLedger, orderedExecutor,
scheduledExecutorService,
- dataSerializer, batchedWriteMaxRecords,
batchedWriteMaxSize,
- batchedWriteMaxDelayInMillis, batchEnabled);
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ ScheduledExecutorService transactionScheduledService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
Review Comment:
```suggestion
Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("txn-scheduler-threads"));
```
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords,
int batchedWriteMaxSize,
}
OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(5).name("tx-threads").build();
- ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
JsonDataSerializer dataSerializer = new
JsonDataSerializer(eachDataBytesLen);
/**
* Execute test task.
* 1. Write many times.
* 2. Store the param-context and param-position of callback
function for verify.
*/
// Create TxLogBufferedWriter.
- TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<Integer>(
- managedLedger, orderedExecutor,
scheduledExecutorService,
- dataSerializer, batchedWriteMaxRecords,
batchedWriteMaxSize,
- batchedWriteMaxDelayInMillis, batchEnabled);
+ HashedWheelTimer transactionTimer = new HashedWheelTimer(new
DefaultThreadFactory("transaction-timer"),
+ 1, TimeUnit.MILLISECONDS);
+ ScheduledExecutorService transactionScheduledService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
Review Comment:
Please check other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]