MarvinCai commented on a change in pull request #4265: 
[transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path
URL: https://github.com/apache/pulsar/pull/4265#discussion_r286779388
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 ##########
 @@ -199,6 +245,27 @@ public void acknowledgeMessage(List<Position> positions, 
AckType ackType, Map<St
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, 
subName, positions);
             }
+
+            synchronized (PersistentSubscription.this) {
+                positions.forEach(position -> {
+                    // If single ack try to ack message in pending_ack status, 
fail the ack.
+                    if (this.pendingAckMessages.contains(position)) {
+                        log.warn("[{}][{}] Invalid acks position conflict with 
an ongoing transaction:{}{}.",
+                                topicName, subName, 
this.txnID.getMostSigBits(), this.txnID.getLeastSigBits());
+                        return;
+                    }
+
+                    // If single ack is within range of cumulative ack of an 
ongoing transaction, fail the ack.
+                    if (null != this.pendingCumulativeAckMessage &&
+                            ((PositionImpl) position).compareTo((PositionImpl) 
this.pendingCumulativeAckMessage) < 0) {
 
 Review comment:
   I thought we mostly check type in catch clause or passed parameter is an 
Object, and PositionImpl is only implementation class of Position so I skip the 
check.
   But seems I do find some places in BK code base that we're checking if it's 
a PositionImpl instance. Also added here, just for safe.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to