codelipenghui commented on a change in pull request #8347:
URL: https://github.com/apache/pulsar/pull/8347#discussion_r511806283



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;

Review comment:
       ```suggestion
       private final PersistentTopic topic;
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
+        topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to appendBufferToTxn for txn {}", txnId, e);

Review comment:
       ```suggestion
                   log.error("Failed to append buffer to txn {}", txnId, e);
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java
##########
@@ -66,15 +66,15 @@ public InMemTransactionBufferReader(TxnID txnId, 
Iterator<Entry<Long, ByteBuf>>
         int i = 0;
         while (i < numEntries && entries.hasNext()) {
             Entry<Long, ByteBuf> entry = entries.next();
-            TransactionEntry txnEntry = new TransactionEntryImpl(
-                txnId,
-                entry.getKey(),
-                EntryImpl.create(-1L, -1L, entry.getValue()),
-                committedAtLedgerId,
-                committedAtEntryId,
-                -1
-            );
-            txnEntries.add(txnEntry);
+//            TransactionEntry txnEntry = new TransactionEntryImpl(
+//                txnId,
+//                entry.getKey(),
+//                EntryImpl.create(-1L, -1L, entry.getValue()),
+//                committedAtLedgerId,
+//                committedAtEntryId,
+//                -1
+//            );
+//            txnEntries.add(txnEntry);

Review comment:
       Why we should delete these lines? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
+        topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to appendBufferToTxn for txn {}", txnId, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(PositionImpl.get(ledgerId, entryId));
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> 
sendMessageIdList) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} commit on topic {}.", txnID.toString(), 
topic.getName());
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, 
txnID.getMostSigBits(),
+                txnID.getLeastSigBits(), 
getMessageIdDataList(sendMessageIdList));
+        topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to commit for txn {}", txnID, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(null);
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> 
sendMessageIdList) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} abort on topic {}.", txnID.toString(), 
topic.getName());
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        ByteBuf abortMarker = Markers.newTxnAbortMarker(
+                -1L, txnID.getMostSigBits(), txnID.getLeastSigBits(), 
getMessageIdDataList(sendMessageIdList));
+        topic.publishMessage(abortMarker, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to abort for txn {}", txnID, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(null);
+        });
+        return completableFuture;
+    }
+
+    private List<PulsarMarkers.MessageIdData> 
getMessageIdDataList(List<MessageIdData> sendMessageIdList) {
+        List<PulsarMarkers.MessageIdData> messageIdDataList = new 
ArrayList<>();
+        for (MessageIdData msgIdData : sendMessageIdList) {
+            messageIdDataList.add(
+                    PulsarMarkers.MessageIdData.newBuilder()
+                            .setLedgerId(msgIdData.getLedgerId())
+                            .setEntryId(msgIdData.getEntryId()).build());
+        }
+        return messageIdDataList;

Review comment:
       Recycle the MessageIdData list before return?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
+        topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to appendBufferToTxn for txn {}", txnId, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(PositionImpl.get(ledgerId, entryId));
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> 
sendMessageIdList) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} commit on topic {}.", txnID.toString(), 
topic.getName());
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, 
txnID.getMostSigBits(),
+                txnID.getLeastSigBits(), 
getMessageIdDataList(sendMessageIdList));
+        topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to commit for txn {}", txnID, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(null);
+        });
+        return completableFuture;

Review comment:
       I'm not sure where the MessageId list that 
`getMessageIdDataList(sendMessageIdList)` getted will be recycle.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
+        CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
+        topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to appendBufferToTxn for txn {}", txnId, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(PositionImpl.get(ledgerId, entryId));
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> 
sendMessageIdList) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} commit on topic {}.", txnID.toString(), 
topic.getName());
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, 
txnID.getMostSigBits(),
+                txnID.getLeastSigBits(), 
getMessageIdDataList(sendMessageIdList));
+        topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to commit for txn {}", txnID, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(null);
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> 
sendMessageIdList) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} abort on topic {}.", txnID.toString(), 
topic.getName());
+        }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+        ByteBuf abortMarker = Markers.newTxnAbortMarker(
+                -1L, txnID.getMostSigBits(), txnID.getLeastSigBits(), 
getMessageIdDataList(sendMessageIdList));
+        topic.publishMessage(abortMarker, (e, ledgerId, entryId) -> {
+            if (e != null) {
+                log.error("Failed to abort for txn {}", txnID, e);
+                completableFuture.completeExceptionally(e);
+                return;
+            }
+            completableFuture.complete(null);
+        });
+        return completableFuture;
+    }
+
+    private List<PulsarMarkers.MessageIdData> 
getMessageIdDataList(List<MessageIdData> sendMessageIdList) {
+        List<PulsarMarkers.MessageIdData> messageIdDataList = new 
ArrayList<>();

Review comment:
       ```suggestion
           List<PulsarMarkers.MessageIdData> messageIdDataList = new 
ArrayList<>(sendMessageIdList.size());
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Markers;
+
+/**
+ * Transaction buffer based on normal persistent topic.
+ */
+@Slf4j
+public class TopicTransactionBuffer implements TransactionBuffer {
+
+    private PersistentTopic topic;
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return null;
+    }

Review comment:
       Shall we need this method?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A provider that provides topic implementations of {@link TransactionBuffer}.
+ */
+public class TopicTransactionBufferProvider implements 
TransactionBufferProvider {
+    @Override
+    public CompletableFuture<TransactionBuffer> newTransactionBuffer() {
+        return CompletableFuture.completedFuture(null);

Review comment:
       throw Not support exception?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -229,12 +233,24 @@ public void 
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
                 completableFutureList.add(actionFuture);
             });
 
+            List<MessageId> messageIdList = new ArrayList<>();
+            for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
+                messageIdList.add(new MessageIdImpl(
+                        messageIdData.getLedgerId(), 
messageIdData.getEntryId(), messageIdData.getPartition()));
+            }
+

Review comment:
       Please recycle the Pulsar command after If confirmed no longer use. 
Please check all.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -206,7 +210,7 @@ public void 
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
         return completableFuture;
     }
 
-    private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
+    private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction, 
List<PulsarApi.MessageIdData> messageIdDataList) {

Review comment:
       ```suggestion
       private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, 
int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
   ```

##########
File path: pulsar-common/src/main/proto/PulsarMarkers.proto
##########
@@ -85,5 +85,5 @@ message MessageIdData {
 
 /// --- Transaction marker ---
 message TxnCommitMarker {
-    required MessageIdData message_id = 1;
+    repeated MessageIdData message_id = 1;

Review comment:
       ```suggestion
       repeated MessageIdData message_ids = 1;
   ```

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -767,6 +767,7 @@ message CommandEndTxn {
     optional uint64 txnid_least_bits = 2 [default = 0];
     optional uint64 txnid_most_bits = 3 [default = 0];
     optional TxnAction txn_action = 4;
+    repeated MessageIdData messageIdList = 5;

Review comment:
       ```suggestion
       repeated MessageIdData message_ids = 5;
   ```

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -783,6 +784,7 @@ message CommandEndTxnOnPartition {
     optional uint64 txnid_most_bits = 3 [default = 0];
     optional string topic = 4;
     optional TxnAction txn_action = 5;
+    repeated MessageIdData messageIdList = 6;

Review comment:
       ```suggestion
       repeated MessageIdData message_ids = 6;
   ```

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
##########
@@ -309,24 +306,24 @@ private static ByteBuf newTxnMarker(MarkerType 
markerType, long sequenceId, long
         MessageMetadata msgMetadata = msgMetadataBuilder.build();
 
         ByteBuf payload;
-        if (messageIdData.isPresent()) {
-            PulsarMarkers.TxnCommitMarker commitMarker = 
PulsarMarkers.TxnCommitMarker.newBuilder()
-                                                                               
       .setMessageId(messageIdData.get())
-                                                                               
       .build();
-            int size = commitMarker.getSerializedSize();
-            payload = PooledByteBufAllocator.DEFAULT.buffer(size);
-            ByteBufCodedOutputStream outStream = 
ByteBufCodedOutputStream.get(payload);
-            commitMarker.writeTo(outStream);
-        } else {
-            payload = PooledByteBufAllocator.DEFAULT.buffer();
-        }
+        PulsarMarkers.TxnCommitMarker.Builder commitMarkerBuilder = 
PulsarMarkers.TxnCommitMarker.newBuilder();
+
+        messageIdDataList.ifPresent(commitMarkerBuilder::addAllMessageId);
+        PulsarMarkers.TxnCommitMarker commitMarker = 
commitMarkerBuilder.build();
+        int size = commitMarker.getSerializedSize();
+        payload = PooledByteBufAllocator.DEFAULT.buffer(size);
+        ByteBufCodedOutputStream outStream = 
ByteBufCodedOutputStream.get(payload);
+        commitMarker.writeTo(outStream);
 
         try {
             return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, 
msgMetadata, payload);
         } finally {
             payload.release();
             msgMetadata.recycle();
             msgMetadataBuilder.recycle();
+            if (commitMarkerBuilder != null) {

Review comment:
       The commitMarkerBuilder always not null.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -255,7 +266,17 @@ public void 
handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionT
             return callback;
         }
         long requestId = client.newRequestId();
-        ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), PulsarApi.TxnAction.ABORT);
+
+        List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>();

Review comment:
       Should be recycled after no longer use
   
   

##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java
##########
@@ -309,24 +306,24 @@ private static ByteBuf newTxnMarker(MarkerType 
markerType, long sequenceId, long
         MessageMetadata msgMetadata = msgMetadataBuilder.build();
 
         ByteBuf payload;
-        if (messageIdData.isPresent()) {
-            PulsarMarkers.TxnCommitMarker commitMarker = 
PulsarMarkers.TxnCommitMarker.newBuilder()
-                                                                               
       .setMessageId(messageIdData.get())
-                                                                               
       .build();
-            int size = commitMarker.getSerializedSize();
-            payload = PooledByteBufAllocator.DEFAULT.buffer(size);
-            ByteBufCodedOutputStream outStream = 
ByteBufCodedOutputStream.get(payload);
-            commitMarker.writeTo(outStream);
-        } else {
-            payload = PooledByteBufAllocator.DEFAULT.buffer();
-        }
+        PulsarMarkers.TxnCommitMarker.Builder commitMarkerBuilder = 
PulsarMarkers.TxnCommitMarker.newBuilder();
+
+        messageIdDataList.ifPresent(commitMarkerBuilder::addAllMessageId);
+        PulsarMarkers.TxnCommitMarker commitMarker = 
commitMarkerBuilder.build();

Review comment:
       The commitMarker also should be recycled.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
##########
@@ -236,7 +238,16 @@ public void 
handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionT
             return callback;
         }
         long requestId = client.newRequestId();
-        ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(), PulsarApi.TxnAction.COMMIT);
+        List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>();

Review comment:
       Should be recycled after no longer use

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
##########
@@ -71,13 +75,21 @@ public TransactionBufferHandlerImpl(ConnectionPool 
connectionPool, NamespaceServ
     }
 
     @Override
-    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action) {
+    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action, List<MessageId> 
messageIdList) {
         CompletableFuture<TxnID> cb = new CompletableFuture<>();
         if (!canSendRequest(cb)) {
             return cb;
         }
         long requestId = requestIdGenerator.getAndIncrement();
-        ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, 
txnIdMostBits, topic, action);
+        List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>();

Review comment:
       Should be recycled after no longer use
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to