liangyepianzhou commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927240282


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -382,37 +398,49 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
                 return null;
             }
         }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
-        // Test threshold: writeMaxDelayInMillis.
-        TxnLogBufferedWriter txnLogBufferedWriter1 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
-                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, 
true);
         TxnLogBufferedWriter.AddDataCallback callback = 
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
-        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+
+        // Test threshold: writeMaxDelayInMillis (use scheduled service).
+        TxnLogBufferedWriter txnLogBufferedWriter0 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, 
true);
+
+        txnLogBufferedWriter0.asyncAddData(100, callback, 100);
         Thread.sleep(90);
         // Verify does not refresh ahead of time.
         Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
dataArrayFlushedToBookie.size() == 1);
         Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+        txnLogBufferedWriter0.close();
+
+        // Test threshold: writeMaxDelayInMillis (use timer).
+        TxnLogBufferedWriter txnLogBufferedWriter1 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                transactionTimer, dataSerializer, 32, 1024 * 4, 100, true);
+        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+        Thread.sleep(70);

Review Comment:
   Why is this not equal to writeMaxDelayInMillis?
   And if we can not use the Thread.sleep().



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords, 
int batchedWriteMaxSize,
         }
         OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
                 .numThreads(5).name("tx-threads").build();
-        ScheduledExecutorService scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
         JsonDataSerializer dataSerializer = new 
JsonDataSerializer(eachDataBytesLen);
         /**
          * Execute test task.
          *   1. Write many times.
          *   2. Store the param-context and param-position of callback 
function for verify.
          */
         // Create TxLogBufferedWriter.
-        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
-                        managedLedger, orderedExecutor, 
scheduledExecutorService,
-                        dataSerializer, batchedWriteMaxRecords, 
batchedWriteMaxSize,
-                        batchedWriteMaxDelayInMillis, batchEnabled);
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        ScheduledExecutorService transactionScheduledService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));

Review Comment:
   ```suggestion
                   Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("txn-scheduler-threads"));
   ```



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords, 
int batchedWriteMaxSize,
         }
         OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
                 .numThreads(5).name("tx-threads").build();
-        ScheduledExecutorService scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
         JsonDataSerializer dataSerializer = new 
JsonDataSerializer(eachDataBytesLen);
         /**
          * Execute test task.
          *   1. Write many times.
          *   2. Store the param-context and param-position of callback 
function for verify.
          */
         // Create TxLogBufferedWriter.
-        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<Integer>(
-                        managedLedger, orderedExecutor, 
scheduledExecutorService,
-                        dataSerializer, batchedWriteMaxRecords, 
batchedWriteMaxSize,
-                        batchedWriteMaxDelayInMillis, batchEnabled);
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new 
DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        ScheduledExecutorService transactionScheduledService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));

Review Comment:
   Please check other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to