This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 65ab3aba1dd0c5a52ec6a9674f68d19925c5f9ef Author: Xiangying Meng <[email protected]> AuthorDate: Fri May 6 17:00:35 2022 +0800 [Fix][Txn]Fix transaction component recover fillQueue (#15418) ### Motivation & Modification The queue size + NUMBER_OF_PER_READ_ENTRY should <= the capacity of queue instead of the queue size <= the capacity of queue. If the processing speed is less than the read speed, the part that exceeds the queue capacity will be ignored. ### About test It hard to add test for this change. This class of replay/recover/FillEntryQueueCallback is not public, so it can not be import by test. And then their flied can not be changed by reflection. With `Thread.sleep` allowed, we can only test the recover of TC by call a `Thread.sleep` in `transactionLogReplayCallback` and can not test TP/TB. (cherry picked from commit 51190ba9187645d9d39cdd787112f9172a38e58f) --- .../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 8 ++++++-- .../broker/transaction/pendingack/impl/MLPendingAckStore.java | 6 ++++-- .../pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java | 6 ++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b0749757e8c..3cbc3f14ea5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -698,6 +698,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private volatile boolean isReadable = true; + private static final int NUMBER_OF_PER_READ_ENTRY = 100; + private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, ManagedCursor cursor, TopicTransactionBufferRecover recover) { this.entryQueue = entryQueue; @@ -705,10 +707,12 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen this.recover = recover; } boolean fillQueue() { - if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { + if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity() + && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); - cursor.asyncReadEntries(100, this, System.nanoTime(), PositionImpl.LATEST); + cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY, + this, System.nanoTime(), PositionImpl.LATEST); } else { if (entryQueue.size() == 0) { isReadable = false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index ccc1628154c..ee5cc22f340 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -367,12 +367,14 @@ public class MLPendingAckStore implements PendingAckStore { private volatile boolean isReadable = true; private final AtomicLong outstandingReadsRequests = new AtomicLong(0); + private static final int NUMBER_OF_PER_READ_ENTRY = 100; boolean fillQueue() { - if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { + if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity() + && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); - readAsync(100, this); + readAsync(NUMBER_OF_PER_READ_ENTRY, this); } } return isReadable; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index 3da19eccf14..f9fd728f24b 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -237,12 +237,14 @@ public class MLTransactionLogImpl implements TransactionLog { private final AtomicLong outstandingReadsRequests = new AtomicLong(0); private volatile boolean isReadable = true; + private static final int NUMBER_OF_PER_READ_ENTRY = 100; boolean fillQueue() { - if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { + if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity() + && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); - readAsync(100, this); + readAsync(NUMBER_OF_PER_READ_ENTRY, this); return isReadable; } else { return false;
