MarvinCai commented on a change in pull request #4265: 
[transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path
URL: https://github.com/apache/pulsar/pull/4265#discussion_r286782915
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 ##########
 @@ -199,6 +245,27 @@ public void acknowledgeMessage(List<Position> positions, 
AckType ackType, Map<St
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
+
+            synchronized (PersistentSubscription.this) {
+                positions.forEach(position -> {
+                    // If single ack try to ack message in pending_ack status, 
fail the ack.
+                    if (this.pendingAckMessages.contains(position)) {
+                        log.warn("[{}][{}] Invalid acks position conflict with 
an ongoing transaction:{}{}.",
+                                topicName, subName, 
this.txnID.getMostSigBits(), this.txnID.getLeastSigBits());
+                        return;
+                    }
+
+                    // If single ack is within range of cumulative ack of an 
ongoing transaction, fail the ack.
+                    if (null != this.pendingCumulativeAckMessage &&
+                            ((PositionImpl) position).compareTo((PositionImpl) 
this.pendingCumulativeAckMessage) < 0) {
 
 Review comment:
   yes, markDelete include the last position, so should be <=.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to