poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590600568


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and 
update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the 
current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the 
lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to 
the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean 
disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   The check `!checkIfNoSnapshot()` is wrong, we need to remove it, right? 
Could you also add a test for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to