MarvinCai commented on a change in pull request #4265: [transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path URL: https://github.com/apache/pulsar/pull/4265#discussion_r286779388
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -199,6 +245,27 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St if (log.isDebugEnabled()) { log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions); } + + synchronized (PersistentSubscription.this) { + positions.forEach(position -> { + // If single ack try to ack message in pending_ack status, fail the ack. + if (this.pendingAckMessages.contains(position)) { + log.warn("[{}][{}] Invalid acks position conflict with an ongoing transaction:{}{}.", + topicName, subName, this.txnID.getMostSigBits(), this.txnID.getLeastSigBits()); + return; + } + + // If single ack is within range of cumulative ack of an ongoing transaction, fail the ack. + if (null != this.pendingCumulativeAckMessage && + ((PositionImpl) position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) < 0) { Review comment: I thought we mostly check type in catch clause or passed parameter is an Object, and PositionImpl is only implementation class of Position so I skip the check. But seems I do find some places in BK code base that we're checking if it's a PositionImpl instance. Also added here, just for safe. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services