This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 51190ba9187 [Fix][Txn]Fix transaction component recover fillQueue
(#15418)
51190ba9187 is described below
commit 51190ba9187645d9d39cdd787112f9172a38e58f
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri May 6 17:00:35 2022 +0800
[Fix][Txn]Fix transaction component recover fillQueue (#15418)
### Motivation & Modification
The queue size + NUMBER_OF_PER_READ_ENTRY should <= the capacity of queue
instead of the queue size <= the capacity of queue. If the processing speed is
less than the read speed, the part that exceeds the queue capacity will be
ignored.
### About test
It hard to add test for this change.
This class of replay/recover/FillEntryQueueCallback is not public, so it
can not be import by test.
And then their flied can not be changed by reflection.
With `Thread.sleep` allowed, we can only test the recover of TC by call a
`Thread.sleep` in `transactionLogReplayCallback` and can not test TP/TB.
---
.../broker/transaction/buffer/impl/TopicTransactionBuffer.java | 8 ++++++--
.../broker/transaction/pendingack/impl/MLPendingAckStore.java | 6 ++++--
.../pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java | 6 ++++--
3 files changed, 14 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b0749757e8c..3cbc3f14ea5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -698,6 +698,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private volatile boolean isReadable = true;
+ private static final int NUMBER_OF_PER_READ_ENTRY = 100;
+
private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue,
ManagedCursor cursor,
TopicTransactionBufferRecover recover) {
this.entryQueue = entryQueue;
@@ -705,10 +707,12 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
this.recover = recover;
}
boolean fillQueue() {
- if (entryQueue.size() < entryQueue.capacity() &&
outstandingReadsRequests.get() == 0) {
+ if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY <
entryQueue.capacity()
+ && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
- cursor.asyncReadEntries(100, this, System.nanoTime(),
PositionImpl.LATEST);
+ cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY,
+ this, System.nanoTime(), PositionImpl.LATEST);
} else {
if (entryQueue.size() == 0) {
isReadable = false;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 289cd14a716..46cb3c173be 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -364,12 +364,14 @@ public class MLPendingAckStore implements PendingAckStore
{
private volatile boolean isReadable = true;
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+ private static final int NUMBER_OF_PER_READ_ENTRY = 100;
boolean fillQueue() {
- if (entryQueue.size() < entryQueue.capacity() &&
outstandingReadsRequests.get() == 0) {
+ if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY <
entryQueue.capacity()
+ && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
- readAsync(100, this);
+ readAsync(NUMBER_OF_PER_READ_ENTRY, this);
}
}
return isReadable;
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 3da19eccf14..f9fd728f24b 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -237,12 +237,14 @@ public class MLTransactionLogImpl implements
TransactionLog {
private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
private volatile boolean isReadable = true;
+ private static final int NUMBER_OF_PER_READ_ENTRY = 100;
boolean fillQueue() {
- if (entryQueue.size() < entryQueue.capacity() &&
outstandingReadsRequests.get() == 0) {
+ if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY <
entryQueue.capacity()
+ && outstandingReadsRequests.get() == 0) {
if (cursor.hasMoreEntries()) {
outstandingReadsRequests.incrementAndGet();
- readAsync(100, this);
+ readAsync(NUMBER_OF_PER_READ_ENTRY, this);
return isReadable;
} else {
return false;