This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 51190ba9187 [Fix][Txn]Fix transaction component recover fillQueue 
(#15418)
51190ba9187 is described below

commit 51190ba9187645d9d39cdd787112f9172a38e58f
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri May 6 17:00:35 2022 +0800

    [Fix][Txn]Fix transaction component recover fillQueue (#15418)
    
    ### Motivation & Modification
    The queue size + NUMBER_OF_PER_READ_ENTRY should <= the capacity of queue 
instead of the queue size <= the capacity of queue. If the processing speed is 
less than the read speed, the part that exceeds the queue capacity will be 
ignored.
    ### About test
    It hard to add test for this change.
    This class of replay/recover/FillEntryQueueCallback is not public, so it 
can not be import by test.
    And then their flied can not be changed by reflection.
    With `Thread.sleep` allowed, we  can only test the recover of  TC by call a 
`Thread.sleep` in `transactionLogReplayCallback` and can not test TP/TB.
---
 .../broker/transaction/buffer/impl/TopicTransactionBuffer.java    | 8 ++++++--
 .../broker/transaction/pendingack/impl/MLPendingAckStore.java     | 6 ++++--
 .../pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java | 6 ++++--
 3 files changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b0749757e8c..3cbc3f14ea5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -698,6 +698,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
         private volatile boolean isReadable = true;
 
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
+
         private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, 
ManagedCursor cursor,
                                        TopicTransactionBufferRecover recover) {
             this.entryQueue = entryQueue;
@@ -705,10 +707,12 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             this.recover = recover;
         }
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < 
entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    cursor.asyncReadEntries(100, this, System.nanoTime(), 
PositionImpl.LATEST);
+                    cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY,
+                            this, System.nanoTime(), PositionImpl.LATEST);
                 } else {
                     if (entryQueue.size() == 0) {
                         isReadable = false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 289cd14a716..46cb3c173be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -364,12 +364,14 @@ public class MLPendingAckStore implements PendingAckStore 
{
 
         private volatile boolean isReadable = true;
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
 
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < 
entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    readAsync(100, this);
+                    readAsync(NUMBER_OF_PER_READ_ENTRY, this);
                 }
             }
             return isReadable;
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 3da19eccf14..f9fd728f24b 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -237,12 +237,14 @@ public class MLTransactionLogImpl implements 
TransactionLog {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
         private volatile boolean isReadable = true;
+        private static final int NUMBER_OF_PER_READ_ENTRY = 100;
 
         boolean fillQueue() {
-            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
+            if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < 
entryQueue.capacity()
+                    && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
-                    readAsync(100, this);
+                    readAsync(NUMBER_OF_PER_READ_ENTRY, this);
                     return isReadable;
                 } else {
                     return false;

Reply via email to