dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1593288742


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and 
update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) 
topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the 
current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the 
lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to 
the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean 
disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   It looks a little strange, `checkIfNoSnapshot()` just means there are no 
snapshots to recover.
   
   ```java
    if (checkIfNoSnapshot()) { 
             this.maxReadPosition = position; 
   ```
   means if there are no transaction messages, all the messages are able to 
dispatch.
   
   I agree with @poorbarcode we don't need this check, once maxReadPosition 
moved forward, we need update changeMaxReadPositionAndAddAbortTimes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to