poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590612538
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID,
PositionImpl readPosition)
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}
+ /**
+ * Sync max read position for normal publish.
+ * @param position {@link PositionImpl} the position to sync.
+ * @param isMarkerMessage whether the message is marker message, in such
case, we
+ * don't need to trigger the callback to update
lastMaxReadPositionMovedForwardTimestamp.
+ */
@Override
- public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+ public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
// when ongoing transaction is empty, proved that lastAddConfirm is
can read max position, because callback
// thread is the same tread, in this time the lastAddConfirm don't
content transaction message.
synchronized (TopicTransactionBuffer.this) {
if (checkIfNoSnapshot()) {
- this.maxReadPosition = position;
+ updateMaxReadPosition(position, isMarkerMessage);
} else if (checkIfReady()) {
if (ongoingTxns.isEmpty()) {
- maxReadPosition = position;
- changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+ updateMaxReadPosition(position, isMarkerMessage);
Review Comment:
This change means only there are no ongoing TXNs, the variable
`lastMaxReadPositionMovedForwardTimestamp` will be changed, which is wrong. We
should only call `MaxReadPositionCallBack` after committing/aborting
transactions. And you should add a test for this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]