This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de9928a5bb4 [fix][clients]Check pendingIndividualBatchIndexAcks size
in doIndividualBatchAckAsync (#15877)
de9928a5bb4 is described below
commit de9928a5bb4a35c3e6129fa43cc90c2a6b454666
Author: gaozhangmin <[email protected]>
AuthorDate: Sat Jun 11 01:07:06 2022 +0800
[fix][clients]Check pendingIndividualBatchIndexAcks size in
doIndividualBatchAckAsync (#15877)
* Check pendingIndividualBatchIndexAcks size and flush in
doIndividualBatchAckAsync
* put logic into method doIndividualBatchAck
---
.../impl/PersistentAcknowledgmentsGroupingTracker.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..005f2e6b74b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -312,9 +312,15 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
+ if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
}
} else {
doIndividualBatchAckAsync(batchMessageId);
+ if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+ flush();
+ }
return CompletableFuture.completedFuture(null);
}
}
@@ -337,15 +343,9 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
return this.currentCumulativeAckFuture;
} finally {
this.lock.readLock().unlock();
- if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
- flush();
- }
}
} else {
doCumulativeAckAsync(messageId, bitSet);
- if (pendingIndividualBatchIndexAcks.size() >=
MAX_ACK_GROUP_SIZE) {
- flush();
- }
return CompletableFuture.completedFuture(null);
}
}