This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 143979d  Fix: avoid continuous acking with invalid msgId for 
cumulative ack (#2498)
143979d is described below

commit 143979d15525520edeaededd6b7878b61f5c8bfe
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Aug 31 15:49:09 2018 -0700

    Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498)
---
 .../pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 0fefe9c..9a87d26 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -57,6 +57,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      * Latest cumulative ack sent to broker
      */
     private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) 
MessageId.earliest;
+    private volatile boolean cumulativeAckFulshRequired = false; 
 
     private static final 
AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, 
MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater
             .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, 
MessageIdImpl.class, "lastCumulativeAck");
@@ -119,6 +120,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             if (msgId.compareTo(lastCumlativeAck) > 0) {
                 if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, 
lastCumlativeAck, msgId)) {
                     // Successfully updated the last cumlative ack. Next flush 
iteration will send this to broker.
+                    cumulativeAckFulshRequired = true;
                     return;
                 }
             } else {
@@ -160,10 +162,11 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             return;
         }
 
-        if (!lastCumulativeAck.equals(MessageId.earliest)) {
+        if (cumulativeAckFulshRequired) {
             ByteBuf cmd = Commands.newAck(consumer.consumerId, 
lastCumulativeAck.ledgerId, lastCumulativeAck.entryId,
                     AckType.Cumulative, null, Collections.emptyMap());
             cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+            cumulativeAckFulshRequired = false;
         }
 
         // Flush all individual acks

Reply via email to