This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 143979d Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498) 143979d is described below commit 143979d15525520edeaededd6b7878b61f5c8bfe Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Fri Aug 31 15:49:09 2018 -0700 Fix: avoid continuous acking with invalid msgId for cumulative ack (#2498) --- .../pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 0fefe9c..9a87d26 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -57,6 +57,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * Latest cumulative ack sent to broker */ private volatile MessageIdImpl lastCumulativeAck = (MessageIdImpl) MessageId.earliest; + private volatile boolean cumulativeAckFulshRequired = false; private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, MessageIdImpl> LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater .newUpdater(PersistentAcknowledgmentsGroupingTracker.class, MessageIdImpl.class, "lastCumulativeAck"); @@ -119,6 +120,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments if (msgId.compareTo(lastCumlativeAck) > 0) { if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId)) { // Successfully updated the last cumlative ack. Next flush iteration will send this to broker. + cumulativeAckFulshRequired = true; return; } } else { @@ -160,10 +162,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return; } - if (!lastCumulativeAck.equals(MessageId.earliest)) { + if (cumulativeAckFulshRequired) { ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId, AckType.Cumulative, null, Collections.emptyMap()); cnx.ctx().write(cmd, cnx.ctx().voidPromise()); + cumulativeAckFulshRequired = false; } // Flush all individual acks