thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1591722666


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, 
PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such 
case, we
+     *                       don't need to trigger the callback to update 
lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, 
boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is 
can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't 
content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   At step 3, we will update `lastMaxReadPositionMovedForwardTimestamp` with 
following logic chain:
   
`org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#commitTxn`
 -> 
`org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#removeTxnAndUpdateMaxReadPosition`
 -> 
`org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#updateMaxReadPosition`
   instead of  `syncMaxReadPositionForNormalPublish`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to