This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 962e4458a92 [fix] [txn] [PIP-160] Avoid timing task burdens the ledger
thread and leads to an avalanche (#16679)
962e4458a92 is described below
commit 962e4458a92191bdc937e5647b216787a241d2c9
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jul 20 23:15:21 2022 +0800
[fix] [txn] [PIP-160] Avoid timing task burdens the ledger thread and leads
to an avalanche (#16679)
Master Issue: #15370
### Motivation
see #15370
### Modifications
#### Managed Ledger I/O thread gets busier and busier.
In origin implementation, `TxnLogBufferedWriter` has two thread pools:
- `ExecutorService singleThreadExecutorForWrite` is A single-threaded
actuator, used to perform Managed Ledger I/O operations. Includes the following
tasks:
- `internalAsyncAddData` Each execution of the `asyncAddData` adds a task.
- `doFlush` The execution of `trigFlush` sometimes add a task, and there
is also a scheduled task that adds tasks.
- `ScheduledExecutorService scheduledExecutorService` is used to
periodically add `doFlush` tasks to the `singleThreadExecutorForWrite`, whether
the `singleThreadExecutorForWrite` is idle or not.
If `singleThreadExecutorForWrite` is busy for a period of time,
`scheduledExecutorService` will keep adding `doFlush` tasks during that period.
Then `singleThreadExecutorForWrite` will be busy with the newly added `doFlush`
tasks and allocate fewer resources to `internalAsyncAddData` while
`scheduledExecutorService` is adding `doFlush` tasks, which will cause
`singleThreadExecutorForWrite` to accumulate more and more tasks, even if that
the `doFlush` task appended by `scheduledExecuto [...]
#### Imprecise timing task
If new data is added immediately after a scheduled task is executed, the
data cannot be flushed in the next scheduled task. Aka. we set the max delay
time to 1 min, the scheduled tasks and new data tasks are executed in the
following sequence:
```
1. scheduled task running at 12:00:00.000
2. scheduled task running at 12:00:01.000
3. add data at 12:00:01.005
4. scheduled task running at 12:00:02.000
```
In the step-4, the flush task will not flush the data to the bookie,
because the result of expression `System.currentTimeMillis() -
firstAsyncAddArgs.addedTime >= batchedWriteMaxDelayInMillis` is `false`, this
data will actually flushed at next scheduled task `12:00:03.000`
### Changes:
#### Managed Ledger I/O thread gets busier and busier.
Since all C tasks are executed in the same thread, we can change that
"after a scheduled task is executed, then add the next one". This reduces the
density of `doFlush trig by timer` task execution, thereby somewhat losing
timing accuracy (the original implementation was not completely accurate
either).
This change can only avoid the task accumulation in `TxnLogBufferedWriter`
caused by too many scheduled tasks, but cannot improve the write performance of
Managed Ledger.
#### Imprecise timing task
Flush triggered by the scheduled task no longer determines whether the time
of the first node reaches the condition. To avoid imprecise scheduled task
execution time, the maximum delay time is still checked and flushed in the
Flush task triggered by the `asyncAddData`.
---
.../coordinator/impl/TxnLogBufferedWriter.java | 85 ++++++++----
.../coordinator/impl/TxnLogBufferedWriterTest.java | 148 +++++++++++++++++++--
2 files changed, 193 insertions(+), 40 deletions(-)
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
index 685fbba9b2f..b3e4a7a8641 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java
@@ -73,6 +73,8 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
private final ManagedLedger managedLedger;
+ private final ScheduledExecutorService scheduledExecutorService;
+
/** All write operation will be executed on single thread. **/
private final ExecutorService singleThreadExecutorForWrite;
@@ -137,14 +139,35 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
this.flushContext = FlushContext.newInstance();
this.dataArray = new ArrayList<>();
+ this.scheduledExecutorService = scheduledExecutorService;
// scheduler task.
- if (batchEnabled) {
- this.scheduledFuture =
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
- batchedWriteMaxDelayInMillis,
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+ if (this.batchEnabled) {
+ nextTimingTrigger();
}
this.state = State.OPEN;
}
+ /***
+ * Why not use {@link
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+ * Because: when the {@link #singleThreadExecutorForWrite} thread
processes slowly, the scheduleAtFixedRate task
+ * will continue to append tasks to the ledger thread, this burdens the
ledger thread and leads to an avalanche.
+ * see: https://github.com/apache/pulsar/pull/16679.
+ */
+ private void nextTimingTrigger(){
+ try {
+ if (state == State.CLOSING || state == State.CLOSED){
+ return;
+ }
+ scheduledFuture = scheduledExecutorService.schedule(() ->
trigFlush(false, true),
+ batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
+ } catch (Exception e){
+ log.error("Start timing flush trigger failed."
+ + " managedLedger: " + managedLedger.getName(), e);
+ }
+ }
+
+
+
/**
* Append a new entry to the end of a managed ledger. All writes will be
performed in the same thread. Callbacks are
* executed in strict write order,but after {@link #close()}, callbacks
that fail by state check will execute
@@ -178,7 +201,7 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
int len = dataSerializer.getSerializedSize(data);
if (len >= batchedWriteMaxSize){
if (!flushContext.asyncAddArgsList.isEmpty()) {
- doTrigFlush(true);
+ doTrigFlush(true, false);
}
ByteBuf byteBuf = dataSerializer.serialize(data);
managedLedger.asyncAddEntry(byteBuf,
DisabledBatchCallback.INSTANCE,
@@ -193,7 +216,7 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
// Calculate bytes-size.
this.bytesSize += len;
// trig flush.
- doTrigFlush(false);
+ doTrigFlush(false, false);
}
/***
@@ -231,29 +254,39 @@ public class TxnLogBufferedWriter<T> implements
AsyncCallbacks.AddEntryCallback,
/**
* Trigger write to bookie once, If the conditions are not met, nothing
will be done.
*/
- public void trigFlush(final boolean force){
- singleThreadExecutorForWrite.execute(() -> doTrigFlush(force));
+ public void trigFlush(final boolean force, boolean byScheduleThreads){
+ singleThreadExecutorForWrite.execute(() -> doTrigFlush(force,
byScheduleThreads));
}
- private void doTrigFlush(boolean force){
- if (flushContext.asyncAddArgsList.isEmpty()) {
- return;
- }
- if (force){
- doFlush();
- return;
- }
- AsyncAddArgs firstAsyncAddArgs = flushContext.asyncAddArgsList.get(0);
- if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >
batchedWriteMaxDelayInMillis){
- doFlush();
- return;
- }
- if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords){
- doFlush();
- return;
- }
- if (this.bytesSize >= batchedWriteMaxSize){
- doFlush();
+ private void doTrigFlush(boolean force, boolean byScheduleThreads){
+ try {
+ if (flushContext.asyncAddArgsList.isEmpty()) {
+ return;
+ }
+ if (force) {
+ doFlush();
+ return;
+ }
+ if (byScheduleThreads) {
+ doFlush();
+ return;
+ }
+ AsyncAddArgs firstAsyncAddArgs =
flushContext.asyncAddArgsList.get(0);
+ if (System.currentTimeMillis() - firstAsyncAddArgs.addedTime >=
batchedWriteMaxDelayInMillis) {
+ doFlush();
+ return;
+ }
+ if (this.flushContext.asyncAddArgsList.size() >=
batchedWriteMaxRecords) {
+ doFlush();
+ return;
+ }
+ if (this.bytesSize >= batchedWriteMaxSize) {
+ doFlush();
+ }
+ } finally {
+ if (byScheduleThreads) {
+ nextTimingTrigger();
+ }
}
}
diff --git
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
index 78bd4e8a57b..f7ee32a0937 100644
---
a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
+++
b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java
@@ -29,10 +29,14 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
@@ -88,9 +92,9 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
// The number of data writes is large.
provider[2] = new Object[]{512, 1024 * 1024, 100, true, 20000, 5, 4,
BookieErrorType.NO_ERROR, false};
// Big data writes.
- provider[3] = new Object[]{512, 1024, 100, true, 3000, 4, 1024,
BookieErrorType.NO_ERROR, false};
+ provider[3] = new Object[]{512, 1024, 100, true, 3000, 6, 1024,
BookieErrorType.NO_ERROR, false};
// A batch has only one data
- provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 4, 4,
BookieErrorType.NO_ERROR, false};
+ provider[4] = new Object[]{1, 1024 * 1024, 100, true, 2000, 6, 4,
BookieErrorType.NO_ERROR, false};
// A batch has only two data
provider[5] = new Object[]{2, 1024 * 1024, 100, true, 1999, 4, 4,
BookieErrorType.NO_ERROR, false};
// Disabled the batch feature
@@ -207,7 +211,7 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
txnLogBufferedWriter.asyncAddData(i, callback, i);
// Ensure flush at least once before close buffered writer.
if (closeBufferedWriter && i == 0){
- txnLogBufferedWriter.trigFlush(true);
+ txnLogBufferedWriter.trigFlush(true, false);
}
if (closeBufferedWriter && bufferedWriteCloseAtIndex == i){
// Wait for any complete callback, avoid unstable.
@@ -378,28 +382,39 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
return null;
}
}).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
- // Start tests.
- TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ // Test threshold: writeMaxDelayInMillis.
+ TxnLogBufferedWriter txnLogBufferedWriter1 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100,
true);
TxnLogBufferedWriter.AddDataCallback callback =
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
- // Test threshold: writeMaxDelayInMillis.
- txnLogBufferedWriter.asyncAddData(100, callback, 100);
- Thread.sleep(101);
+ txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+ Thread.sleep(90);
+ // Verify does not refresh ahead of time.
+ Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
dataArrayFlushedToBookie.size() == 1);
+ Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+ txnLogBufferedWriter1.close();
+
// Test threshold: batchedWriteMaxRecords.
+ TxnLogBufferedWriter txnLogBufferedWriter2 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 32, 1024 * 4, 10000,
true);
for (int i = 0; i < 32; i++){
- txnLogBufferedWriter.asyncAddData(1, callback, 1);
+ txnLogBufferedWriter2.asyncAddData(1, callback, 1);
}
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
dataArrayFlushedToBookie.size() == 2);
+ Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
+ txnLogBufferedWriter2.close();
+
// Test threshold: batchedWriteMaxSize.
- TxnLogBufferedWriter txnLogBufferedWriter2 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
- scheduledExecutorService, dataSerializer, 1024, 64 * 4, 100,
true);
+ TxnLogBufferedWriter txnLogBufferedWriter3 = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 1024, 64 * 4, 10000,
true);
for (int i = 0; i < 64; i++){
- txnLogBufferedWriter2.asyncAddData(1, callback, 1);
+ txnLogBufferedWriter3.asyncAddData(1, callback, 1);
}
// Assert 4 flush.
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
dataArrayFlushedToBookie.size() == 3);
- Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
- Assert.assertEquals(dataArrayFlushedToBookie.get(1).intValue(), 32);
Assert.assertEquals(dataArrayFlushedToBookie.get(2).intValue(), 64);
+ txnLogBufferedWriter3.close();
+
// Assert all resources released
dataSerializer.assertAllByteBufHasBeenReleased();
// clean up.
@@ -408,6 +423,111 @@ public class TxnLogBufferedWriterTest extends
MockedBookKeeperTestCase {
orderedExecutor.shutdown();
}
+ /**
+ * The use of {@link
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}
for timed
+ * tasks in the original implementation caused this problem:
+ * When the writer thread processes slowly, the scheduleAtFixedRate task
will continue to append tasks to the
+ * ledger thread, this burdens the ledger thread and leads to an
avalanche.
+ * This method is used to verify the fix for the above problem. see:
https://github.com/apache/pulsar/pull/16679.
+ */
+ @Test
+ public void testPendingScheduleTriggerTaskCount() throws Exception {
+ // Create components.
+ String managedLedgerName = "-";
+ ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+ Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+ OrderedExecutor orderedExecutor = Mockito.mock(OrderedExecutor.class);
+ ArrayBlockingQueue<Runnable> workQueue = new
ArrayBlockingQueue<>(65536 * 2);
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
5, TimeUnit.SECONDS, workQueue);
+
Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+ ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ // Count the number of tasks that have been submitted to bookie for
later validation.
+ AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ completeFlushTaskCounter.incrementAndGet();
+ ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+ byteBuf.skipBytes(4);
+ AsyncCallbacks.AddEntryCallback callback =
+ (AsyncCallbacks.AddEntryCallback)
invocation.getArguments()[1];
+ callback.addComplete(PositionImpl.get(1,1), byteBuf,
+ invocation.getArguments()[2]);
+ return null;
+ }
+ }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
+ // Start tests.
+ TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+ scheduledExecutorService, dataSerializer, 2, 1024 * 4, 1,
true);
+ TxnLogBufferedWriter.AddDataCallback callback =
Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
+ // Append heavier tasks to the Ledger thread.
+ final ExecutorService executorService =
orderedExecutor.chooseThread(managedLedgerName);
+ AtomicInteger heavierTaskCounter = new AtomicInteger();
+ Thread heavierTask = new Thread(() -> {
+ while (true) {
+ executorService.execute(() -> {
+ try {
+ heavierTaskCounter.incrementAndGet();
+ Thread.sleep(19);
+ } catch (InterruptedException e) {
+ }
+ heavierTaskCounter.decrementAndGet();
+ });
+ try {
+ Thread.sleep(20);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ // Append normal tasks to ledger thread.
+ AtomicInteger addAsyncDataTaskCounter = new AtomicInteger();
+ AtomicInteger normalFlushCounter = new AtomicInteger();
+ Thread normalWriteTask = new Thread(() -> {
+ while (true) {
+ for (int i = 0; i < 2; i++) {
+ addAsyncDataTaskCounter.incrementAndGet();
+ txnLogBufferedWriter.asyncAddData(1, callback, 1);
+ }
+ normalFlushCounter.incrementAndGet();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ });
+ heavierTask.start();
+ normalWriteTask.start();
+ // Running 100 millis.
+ Thread.sleep(100);
+ heavierTask.interrupt();
+ normalWriteTask.interrupt();
+ /**
+ * Calculates the expected maximum number of remaining tasks.
+ * 1. add the task async add.
+ * 2. add the task flush by records count limit.
+ * 3. sub the task already complete.
+ * 4. add the heavier task count.
+ */
+ int maxCountOfRemainingTasks = 0;
+ maxCountOfRemainingTasks += normalFlushCounter.get();
+ maxCountOfRemainingTasks += addAsyncDataTaskCounter.get();
+ maxCountOfRemainingTasks -= completeFlushTaskCounter.get() * 3;
+ maxCountOfRemainingTasks += heavierTaskCounter.get();
+ // In addition to the above tasks, is the timing tasks.
+ // Assert the timing task count. The above calculation is not
accurate, so leave a margin.
+ Assert.assertTrue(workQueue.size() - maxCountOfRemainingTasks < 10);
+ // clean up.
+ txnLogBufferedWriter.close();
+ dataSerializer.cleanup();
+ threadPoolExecutor.shutdown();
+ scheduledExecutorService.shutdown();
+ orderedExecutor.shutdown();
+ }
+
private static class JsonDataSerializer implements
TxnLogBufferedWriter.DataSerializer<Integer>{
private static ObjectMapper objectMapper = new ObjectMapper();