poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590590340
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}
- void updateMaxReadPosition(TxnID txnID) {
- PositionImpl preMaxReadPosition = this.maxReadPosition;
+ /**
+ * remove the specified transaction from ongoing transaction list and
update the max read position.
+ * @param txnID
+ */
+ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
- maxReadPosition = ((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(position);
+ updateMaxReadPosition(((ManagedLedgerImpl)
topic.getManagedLedger()).getPreviousPosition(position), false);
} else {
- maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
+ updateMaxReadPosition((PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry(), false);
}
- if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
- this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+ }
+
+ /**
+ * update the max read position. if the new position is greater than the
current max read position,
+ * we will trigger the callback, unless the disableCallback is true.
+ * Currently, we only use the callback to update the
lastMaxReadPositionMovedForwardTimestamp.
+ * For non-transactional production, some marker messages will be sent to
the topic, in which case we don't need
+ * to trigger the callback.
+ * @param newPosition new max read position to update.
+ * @param disableCallback whether disable the callback.
+ */
+ void updateMaxReadPosition(PositionImpl newPosition, boolean
disableCallback) {
+ PositionImpl preMaxReadPosition = this.maxReadPosition;
+ this.maxReadPosition = newPosition;
Review Comment:
<del>Why do you set `maxReadPosition` at this line? It will make all the
messages can be read.</del>
Please ignore this comment, it is my mistake
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]