liangyepianzhou commented on code in PR #22707:
URL: https://github.com/apache/pulsar/pull/22707#discussion_r1623371207
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -377,8 +379,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl
readPosition) {
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
- if (!isMarkerMessage && maxReadPositionCallBack != null) {
- maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
Review Comment:
This write approach is strange. Please follow the previous way.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -331,6 +332,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long
lowWaterMark) {
TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
txnBuffer.abort();
buffers.remove(txnID, txnBuffer);
+ updateLastDispatchablePosition(null);
Review Comment:
We should not update the position to null even if failed to commit a
transaction.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java:
##########
@@ -307,6 +307,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long
lowWaterMark) {
txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId);
addTxnToTxnIdex(txnID, committedAtLedgerId);
}
+ updateLastDispatchablePosition(null);
Review Comment:
Why do you update this position to null?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]