congbobo184 commented on a change in pull request #9195:
URL: https://github.com/apache/pulsar/pull/9195#discussion_r557820479



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
 
     private final PersistentTopic topic;
 
+    private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+    private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
+
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();

Review comment:
       When we handle lowWaterMark, we should get the first entry of the map to 
compare the txnID with lowWaterMark. And ongoingTxns is ordered, so we should 
get the first key position become the maxReadPosition.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -58,6 +63,12 @@ public TopicTransactionBuffer(PersistentTopic topic) {
         topic.getManagedLedger().asyncAddEntry(buffer, new 
AsyncCallbacks.AddEntryCallback() {
             @Override
             public void addComplete(Position position, Object ctx) {
+                if (!ongoingTxns.containsKey(txnId)) {
+                    ongoingTxns.put(txnId, (PositionImpl) position);
+                    PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());

Review comment:
       Yes, it not a thread-safe container, i lock all logical what change this 
map.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
 
     private final PersistentTopic topic;
 
+    private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+    private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
+
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();

Review comment:
       I have add the LowWaterMarkTest. Add the abort and commit 
MaxReadPosition Test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to