congbobo184 commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r534241703



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState 
implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, 
ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new 
ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will 
sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new 
ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = 
"-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws 
ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + 
TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = 
managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = 
PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start 
position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new 
TransactionBufferReplayCallback() {

Review comment:
       yes, you are right. we should control the thread start and close.
   
   ```
   one question isn't it too heavyweight to start a thread per each topic ?
   ```
   we async init TopicTransactionBuffer, do you have any better way to do it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to