This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 962e4458a92 [fix] [txn] [PIP-160] Avoid timing task burdens the ledger 
thread and leads to an avalanche (#16679)
962e4458a92 is described below

commit 962e4458a92191bdc937e5647b216787a241d2c9
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 20 23:15:21 2022 +0800

    [fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads 
to an avalanche (#16679)
    
    Master Issue: #15370
    
    ### Motivation
    
    see #15370
    
    ### Modifications
    
    ####  Managed Ledger I/O thread gets busier and busier.
    
    In origin implementation, `TxnLogBufferedWriter` has two thread pools:
    
    - `ExecutorService singleThreadExecutorForWrite` is A single-threaded 
actuator, used to perform Managed Ledger I/O operations. Includes the following 
tasks:
      - `internalAsyncAddData` Each execution of the `asyncAddData` adds a task.
      - `doFlush` The execution of `trigFlush` sometimes add a task, and there 
is also a scheduled task that adds tasks.
    - `ScheduledExecutorService scheduledExecutorService` is used to 
periodically add `doFlush` tasks to the `singleThreadExecutorForWrite`, whether 
the `singleThreadExecutorForWrite` is idle or not.
    
    If `singleThreadExecutorForWrite` is busy for a period of time, 
`scheduledExecutorService` will keep adding `doFlush` tasks during that period. 
Then `singleThreadExecutorForWrite` will be busy with the newly added `doFlush` 
tasks and allocate fewer resources to `internalAsyncAddData` while 
`scheduledExecutorService` is adding `doFlush` tasks, which will cause 
`singleThreadExecutorForWrite` to accumulate more and more tasks, even if that 
the `doFlush` task appended by `scheduledExecuto [...]
    
    #### Imprecise timing task
    If new data is added immediately after a scheduled task is executed, the 
data cannot be flushed in the next scheduled task. Aka. we set the max delay 
time to 1 min, the scheduled tasks and new data tasks are executed in the 
following sequence:
    
    ```
    1. scheduled task running at 12:00:00.000
    2. scheduled task running at 12:00:01.000
    3. add data at 12:00:01.005
    4. scheduled task running at 12:00:02.000
    ```
    In the step-4, the flush task will not flush the data to the bookie, 
because the result of expression `System.currentTimeMillis() - 
firstAsyncAddArgs.addedTime >= batchedWriteMaxDelayInMillis` is `false`, this 
data will actually flushed at next scheduled task `12:00:03.000`
    
    
    ### Changes:
    
    ####  Managed Ledger I/O thread gets busier and busier.
    
    Since all C tasks are executed in the same thread, we can change that 
"after a scheduled task is executed, then add the next one". This reduces the 
density of `doFlush trig by timer` task execution, thereby somewhat losing 
timing accuracy (the original implementation was not completely accurate 
either).
    
    This change can only avoid the task accumulation in `TxnLogBufferedWriter` 
caused by too many scheduled tasks, but cannot improve the write performance of 
Managed Ledger.
    
    #### Imprecise timing task
    Flush triggered by the scheduled task no longer determines whether the time 
of the first node reaches the condition. To avoid imprecise scheduled task 
execution time, the maximum delay time is still checked and flushed in the 
Flush task triggered by the `asyncAddData`.
---
 .../coordinator/impl/TxnLogBufferedWriter.java     |  85 ++++++++----
 .../coordinator/impl/TxnLogBufferedWriterTest.java | 148 +++++++++++++++++++--
 2 files changed, 193 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index 685fbba9b2f..b3e4a7a8641 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -73,6 +73,8 @@ public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback,
 
     private final ManagedLedger managedLedger;
 
+    private final ScheduledExecutorService scheduledExecutorService;
+
     /** All write operation will be executed on single thread. **/
     private final ExecutorService singleThreadExecutorForWrite;
 
@@ -137,14 +139,35 @@ public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback,
         this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
         this.flushContext = FlushContext.newInstance();
         this.dataArray = new ArrayList<>();
+        this.scheduledExecutorService = scheduledExecutorService;
         // scheduler task.
-        if (batchEnabled) {
-            this.scheduledFuture = 
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
-                    batchedWriteMaxDelayInMillis, 
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        if (this.batchEnabled) {
+            nextTimingTrigger();
         }
         this.state = State.OPEN;
     }
 
+    /***
+     * Why not use {@link 
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+     * Because: when the {@link #singleThreadExecutorForWrite} thread 
processes slowly, the scheduleAtFixedRate task
+     * will continue to append tasks to the ledger thread, this burdens the 
ledger thread and leads to an avalanche.
+     * see: https://github.com/apache/pulsar/pull/16679.
+     */
+    private void nextTimingTrigger(){
+        try {
+            if (state == State.CLOSING || state == State.CLOSED){
+                return;
+            }
+            scheduledFuture = scheduledExecutorService.schedule(() -> 
trigFlush(false, true),
+                    batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
+        } catch (Exception e){
+            log.error("Start timing flush trigger failed."
+                    + " managedLedger: " + managedLedger.getName(), e);
+        }
+    }
+
+
+
     /**
      * Append a new entry to the end of a managed ledger. All writes will be 
performed in the same thread. Callbacks are
      * executed in strict write order,but after {@link #close()}, callbacks 
that fail by state check will execute
@@ -178,7 +201,7 @@ public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback,
         int len = dataSerializer.getSerializedSize(data);
         if (len >= batchedWriteMaxSize){
             if (!flushContext.asyncAddArgsList.isEmpty()) {
-                doTrigFlush(true);
+                doTrigFlush(true, false);
             }
             ByteBuf byteBuf = dataSerializer.serialize(data);
             managedLedger.asyncAddEntry(byteBuf, 
DisabledBatchCallback.INSTANCE,
@@ -193,7 +216,7 @@ public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback,
         // Calculate bytes-size.
         this.bytesSize += len;
         // trig flush.
-        doTrigFlush(false);
+        doTrigFlush(false, false);
     }
 
     /***
@@ -231,29 +254,39 @@ public class TxnLogBufferedWriter<T> implements 
AsyncCallbacks.AddEntryCallback,
     /**
      * Trigger write to bookie once, If the conditions are not met, nothing 
will be done.
      */
-    public void trigFlush(final boolean force){
-        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force));
+    public void trigFlush(final boolean force, boolean byScheduleThreads){
+        singleThreadExecutorForWrite.execute(() -> doTrigFlush(force, 
byScheduleThreads));
     }
 
-    private void doTrigFlush(boolean force){
-        if (flushContext.asyncAddArgsList.isEmpty()) {
-            return;
-        }
-        if (force){
-            doFlush();
-            return;
-        }
-        AsyncAddArgs firstAsyncAddArgs = flushContext.asyncAddArgsList.get(0);
-        if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime > 
batchedWriteMaxDelayInMillis){
-            doFlush();
-            return;
-        }
-        if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords){
-            doFlush();
-            return;
-        }
-        if (this.bytesSize >= batchedWriteMaxSize){
-            doFlush();
+    private void doTrigFlush(boolean force, boolean byScheduleThreads){
+        try {
+            if (flushContext.asyncAddArgsList.isEmpty()) {
+                return;
+            }
+            if (force) {
+                doFlush();
+                return;
+            }
+            if (byScheduleThreads) {
+                doFlush();
+                return;
+            }
+            AsyncAddArgs firstAsyncAddArgs = 
flushContext.asyncAddArgsList.get(0);
+            if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >= 
batchedWriteMaxDelayInMillis) {
+                doFlush();
+                return;
+            }
+            if (this.flushContext.asyncAddArgsList.size() >= 
batchedWriteMaxRecords) {
+                doFlush();
+                return;
+            }
+            if (this.bytesSize >= batchedWriteMaxSize) {
+                doFlush();
+            }
+        } finally {
+            if (byScheduleThreads) {
+                nextTimingTrigger();
+            }
         }
     }
 
diff --git 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 78bd4e8a57b..f7ee32a0937 100644
--- 
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++ 
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -29,10 +29,14 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
@@ -88,9 +92,9 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         // The number of data writes is large.
         provider[2] = new Object[]{512, 1024 * 1024, 100, true, 20000, 5, 4, 
BookieErrorType.NO_ERROR, false};
         // Big data writes.
-        provider[3] = new Object[]{512, 1024, 100, true, 3000, 4, 1024, 
BookieErrorType.NO_ERROR, false};
+        provider[3] = new Object[]{512, 1024, 100, true, 3000, 6, 1024, 
BookieErrorType.NO_ERROR, false};
         // A batch has only one data
-        provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 4, 4, 
BookieErrorType.NO_ERROR, false};
+        provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 6, 4, 
BookieErrorType.NO_ERROR, false};
         // A batch has only two data
         provider[5] = new Object[]{2, 1024 * 1024, 100, true, 1999, 4, 4, 
BookieErrorType.NO_ERROR, false};
         // Disabled the batch feature
@@ -207,7 +211,7 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
             txnLogBufferedWriter.asyncAddData(i, callback, i);
             // Ensure flush at least once before close buffered writer.
             if (closeBufferedWriter && i == 0){
-                txnLogBufferedWriter.trigFlush(true);
+                txnLogBufferedWriter.trigFlush(true, false);
             }
             if (closeBufferedWriter && bufferedWriteCloseAtIndex == i){
                 // Wait for any complete callback, avoid unstable.
@@ -378,28 +382,39 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
                 return null;
             }
         }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
-        // Start tests.
-        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+        // Test threshold: writeMaxDelayInMillis.
+        TxnLogBufferedWriter txnLogBufferedWriter1 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
                 scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, 
true);
         TxnLogBufferedWriter.AddDataCallback callback = 
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
-        // Test threshold: writeMaxDelayInMillis.
-        txnLogBufferedWriter.asyncAddData(100, callback, 100);
-        Thread.sleep(101);
+        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+        Thread.sleep(90);
+        // Verify does not refresh ahead of time.
+        Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
dataArrayFlushedToBookie.size() == 1);
+        Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+        txnLogBufferedWriter1.close();
+
         // Test threshold: batchedWriteMaxRecords.
+        TxnLogBufferedWriter txnLogBufferedWriter2 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 10000, 
true);
         for (int i = 0; i < 32; i++){
-            txnLogBufferedWriter.asyncAddData(1, callback, 1);
+            txnLogBufferedWriter2.asyncAddData(1, callback, 1);
         }
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
dataArrayFlushedToBookie.size() == 2);
+        Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
+        txnLogBufferedWriter2.close();
+
         // Test threshold: batchedWriteMaxSize.
-        TxnLogBufferedWriter txnLogBufferedWriter2 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
-                scheduledExecutorService, dataSerializer, 1024, 64 * 4, 100, 
true);
+        TxnLogBufferedWriter txnLogBufferedWriter3 = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 1024, 64 * 4, 10000, 
true);
         for (int i = 0; i < 64; i++){
-            txnLogBufferedWriter2.asyncAddData(1, callback, 1);
+            txnLogBufferedWriter3.asyncAddData(1, callback, 1);
         }
         // Assert 4 flush.
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
dataArrayFlushedToBookie.size() == 3);
-        Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
-        Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
         Assert.assertEquals(dataArrayFlushedToBookie.get(2).intValue(), 64);
+        txnLogBufferedWriter3.close();
+
         // Assert all resources released
         dataSerializer.assertAllByteBufHasBeenReleased();
         // clean up.
@@ -408,6 +423,111 @@ public class TxnLogBufferedWriterTest extends 
MockedBookKeeperTestCase {
         orderedExecutor.shutdown();
     }
 
+    /**
+     * The use of {@link 
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} 
for timed
+     * tasks in the original implementation caused this problem:
+     *   When the writer thread processes slowly, the scheduleAtFixedRate task 
will continue to append tasks to the
+     *   ledger thread, this burdens the ledger thread and leads to an 
avalanche.
+     * This method is used to verify the fix for the above problem. see: 
https://github.com/apache/pulsar/pull/16679.
+     */
+    @Test
+    public void testPendingScheduleTriggerTaskCount() throws Exception {
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  Mockito.mock(OrderedExecutor.class);
+        ArrayBlockingQueue<Runnable> workQueue = new 
ArrayBlockingQueue<>(65536 * 2);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 
5, TimeUnit.SECONDS, workQueue);
+        
Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Count the number of tasks that have been submitted to bookie for 
later validation.
+        AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                completeFlushTaskCounter.incrementAndGet();
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) 
invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 2, 1024 * 4, 1, 
true);
+        TxnLogBufferedWriter.AddDataCallback callback = 
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
+        // Append heavier tasks to the Ledger thread.
+        final ExecutorService executorService = 
orderedExecutor.chooseThread(managedLedgerName);
+        AtomicInteger heavierTaskCounter = new AtomicInteger();
+        Thread heavierTask = new Thread(() -> {
+            while (true) {
+                executorService.execute(() -> {
+                    try {
+                        heavierTaskCounter.incrementAndGet();
+                        Thread.sleep(19);
+                    } catch (InterruptedException e) {
+                    }
+                    heavierTaskCounter.decrementAndGet();
+                });
+                try {
+                    Thread.sleep(20);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        });
+        // Append normal tasks to ledger thread.
+        AtomicInteger addAsyncDataTaskCounter = new AtomicInteger();
+        AtomicInteger normalFlushCounter = new AtomicInteger();
+        Thread normalWriteTask = new Thread(() -> {
+            while (true) {
+                for (int i = 0; i < 2; i++) {
+                    addAsyncDataTaskCounter.incrementAndGet();
+                    txnLogBufferedWriter.asyncAddData(1, callback, 1);
+                }
+                normalFlushCounter.incrementAndGet();
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        });
+        heavierTask.start();
+        normalWriteTask.start();
+        // Running 100 millis.
+        Thread.sleep(100);
+        heavierTask.interrupt();
+        normalWriteTask.interrupt();
+        /**
+         * Calculates the expected maximum number of remaining tasks.
+         * 1. add the task async add.
+         * 2. add the task flush by records count limit.
+         * 3. sub the task already complete.
+         * 4. add the heavier task count.
+         */
+        int maxCountOfRemainingTasks = 0;
+        maxCountOfRemainingTasks += normalFlushCounter.get();
+        maxCountOfRemainingTasks += addAsyncDataTaskCounter.get();
+        maxCountOfRemainingTasks -= completeFlushTaskCounter.get() * 3;
+        maxCountOfRemainingTasks += heavierTaskCounter.get();
+        // In addition to the above tasks, is the timing tasks.
+        // Assert the timing task count. The above calculation is not 
accurate, so leave a margin.
+        Assert.assertTrue(workQueue.size() - maxCountOfRemainingTasks < 10);
+        // clean up.
+        txnLogBufferedWriter.close();
+        dataSerializer.cleanup();
+        threadPoolExecutor.shutdown();
+        scheduledExecutorService.shutdown();
+        orderedExecutor.shutdown();
+    }
+
     private static class JsonDataSerializer implements 
TxnLogBufferedWriter.DataSerializer<Integer>{
 
         private static ObjectMapper objectMapper = new ObjectMapper();

Reply via email to