congbobo184 commented on a change in pull request #9195:
URL: https://github.com/apache/pulsar/pull/9195#discussion_r557820479
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
private final PersistentTopic topic;
+ private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+ private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new
LinkedMap<>();
+
+ private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
Review comment:
When we handle lowWaterMark, we should get the first entry of the map to
compare the txnID with lowWaterMark. And ongoingTxns is ordered, so we should
get the first key position become the maxReadPosition.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -58,6 +63,12 @@ public TopicTransactionBuffer(PersistentTopic topic) {
topic.getManagedLedger().asyncAddEntry(buffer, new
AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
+ if (!ongoingTxns.containsKey(txnId)) {
+ ongoingTxns.put(txnId, (PositionImpl) position);
+ PositionImpl firstPosition =
ongoingTxns.get(ongoingTxns.firstKey());
Review comment:
Yes, it not a thread-safe container, i lock all logical what change this
map.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
private final PersistentTopic topic;
+ private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+ private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new
LinkedMap<>();
+
+ private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
Review comment:
I have add the LowWaterMarkTest. Add the abort and commit
MaxReadPosition Test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]