MarvinCai commented on a change in pull request #4265: [transaction][acknowledge] Introduce PENDING_ACK state in acknowledgement path URL: https://github.com/apache/pulsar/pull/4265#discussion_r286782915
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java ########## @@ -199,6 +245,27 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St if (log.isDebugEnabled()) { log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions); } + + synchronized (PersistentSubscription.this) { + positions.forEach(position -> { + // If single ack try to ack message in pending_ack status, fail the ack. + if (this.pendingAckMessages.contains(position)) { + log.warn("[{}][{}] Invalid acks position conflict with an ongoing transaction:{}{}.", + topicName, subName, this.txnID.getMostSigBits(), this.txnID.getLeastSigBits()); + return; + } + + // If single ack is within range of cumulative ack of an ongoing transaction, fail the ack. + if (null != this.pendingCumulativeAckMessage && + ((PositionImpl) position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) < 0) { Review comment: yes, markDelete include the last position, so should be <=. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services