codelipenghui commented on a change in pull request #9195:
URL: https://github.com/apache/pulsar/pull/9195#discussion_r557342672



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1789,6 +1794,11 @@ private void internalReadFromLedger(ReadHandle ledger, 
OpReadEntry opReadEntry)
             lastEntryInLedger = ledger.getLastAddConfirmed();
         }
 
+        // can read max position entryId
+        if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {

Review comment:
       Same as the above comment

##########
File path: pulsar-common/src/main/proto/PulsarMarkers.proto
##########
@@ -84,6 +84,5 @@ message MarkersMessageIdData {
 
 
 /// --- Transaction marker ---
-message TxnCommitMarker {
-    repeated MarkersMessageIdData message_id = 1;
+message TxnMarker {

Review comment:
       If not useful, please remove it from the proto.

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1774,6 +1774,11 @@ public void asyncReadEntry(PositionImpl position, 
ReadEntryCallback callback, Ob
     }
 
     private void internalReadFromLedger(ReadHandle ledger, OpReadEntry 
opReadEntry) {
+
+        if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {

Review comment:
       The maxPostion of the OpReadEntry might be null?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
##########
@@ -347,4 +347,19 @@ private void addTxnToTxnIdex(TxnID txnId, long 
committedAtLedgerId) {
         return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public boolean isTxnAborted(TxnID txnID) {
+        return false;
+    }
+
+    @Override
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+        //no-op
+    }
+
+    @Override
+    public PositionImpl getMaxReadPosition() {
+        return PositionImpl.latest;

Review comment:
       Why the in-memory transaction buffer returns latest?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -58,6 +63,12 @@ public TopicTransactionBuffer(PersistentTopic topic) {
         topic.getManagedLedger().asyncAddEntry(buffer, new 
AsyncCallbacks.AddEntryCallback() {
             @Override
             public void addComplete(Position position, Object ctx) {
+                if (!ongoingTxns.containsKey(txnId)) {
+                    ongoingTxns.put(txnId, (PositionImpl) position);
+                    PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());

Review comment:
       The LinkedMap will throw NoSuchElementException while the map is empty. 
And I think the LinkedMap is not a thread-safe container?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
 
     private final PersistentTopic topic;
 
+    private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+    private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
+
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();

Review comment:
       Why this also need LinkedMap ? Please give comment for these two maps 
especially the value of the map, I think the first map is the min message 
position of the transaction and the second map is the max message position of 
the transaction?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -123,14 +137,37 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
-    private List<MarkersMessageIdData> 
getMessageIdDataList(List<MessageIdData> sendMessageIdList) {
-        List<MarkersMessageIdData> messageIdDataList = new 
ArrayList<>(sendMessageIdList.size());
-        for (MessageIdData msgIdData : sendMessageIdList) {
-            messageIdDataList.add(new MarkersMessageIdData()
-                            .setLedgerId(msgIdData.getLedgerId())
-                            .setEntryId(msgIdData.getEntryId()));
+    private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
+        if (!ongoingTxns.isEmpty()) {
+            TxnID firstTxn = ongoingTxns.firstKey();
+            if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && 
lowWaterMark >= firstTxn.getLeastSigBits()) {
+                ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
+                        txnID.getMostSigBits(), txnID.getLeastSigBits());
+                topic.getManagedLedger().asyncAddEntry(abortMarker, new 
AsyncCallbacks.AddEntryCallback() {
+                    @Override
+                    public void addComplete(Position position, Object ctx) {
+                        aborts.put(firstTxn, (PositionImpl) position);
+                        changeMaxReadPosition(firstTxn);
+                    }
+
+                    @Override
+                    public void addFailed(ManagedLedgerException exception, 
Object ctx) {
+                        log.error("Failed to abort low water mark for txn {}", 
txnID, exception);
+                    }
+                }, null);
+            }
+        }
+    }
+
+    void changeMaxReadPosition(TxnID txnID) {

Review comment:
       ```suggestion
       void updateMaxReadPosition(TxnID txnID) {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -43,6 +42,12 @@
 
     private final PersistentTopic topic;
 
+    private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+
+    private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new 
LinkedMap<>();
+
+    private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();

Review comment:
       Is there any test coverage? We'd better to add some unit test to make 
sure the data should be removed correctly to avoid memory lead and make the 
maxReadPosition is correct.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
##########
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import com.google.common.collect.Sets;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Pulsar client transaction test.
+ */
+@Slf4j
+public class TransactionStablePositionTest extends TransactionTestBase {
+
+    private final static String TENANT = "tnx";
+    private final static String NAMESPACE1 = TENANT + "/ns1";
+    private final static String TOPIC = NAMESPACE1 + "/test-topic";
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        internalSetup();
+
+        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
+        admin.clusters().createCluster(CLUSTER_NAME, new 
ClusterData("http://localhost:"; + webServicePort));
+        admin.tenants().createTenant(TENANT,
+                new TenantInfo(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE1);
+        admin.topics().createNonPartitionedTopic(TOPIC);
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfo(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
+
+        pulsarClient = PulsarClient.builder()
+                
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+
+        Thread.sleep(1000 * 3);

Review comment:
       Please try to avoid use sleep here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to