congbobo184 commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r534243188



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
 
 /**
  * Transaction buffer based on normal persistent topic.
  */
 @Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState 
implements TransactionBuffer {
 
     private final PersistentTopic topic;
 
-    public TopicTransactionBuffer(PersistentTopic topic) {
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    private final ConcurrentOpenHashMap<TxnID, 
ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new 
ConcurrentOpenHashMap<>();
+
+    //this is for transaction buffer replay start position
+    //this will be stored in managed ledger properties and every 10000 will 
sync to zk by default.
+    private final ConcurrentSkipListSet<PositionImpl> positionsSort = new 
ConcurrentSkipListSet<>();
+
+    private final ManagedCursor cursor;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+    //this if for replay
+    private PositionImpl currentLoadPosition;
+
+    private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+    private final static String TXN_ON_GOING_POSITION_SUFFIX = 
"-txnOnGoingPosition";
+
+    private final String txnOnGoingPositionName;
+
+    //TODO this can config
+    private int defaultCountToSyncPosition = 10000;
+
+    public TopicTransactionBuffer(PersistentTopic topic) throws 
ManagedLedgerException {
+        super(State.None);
+        this.entryQueue = new SpscArrayQueue<>(2000);
         this.topic = topic;
+        ManagedLedger managedLedger = topic.getManagedLedger();
+        this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
+        this.txnOnGoingPositionName = topic.getName() + 
TXN_ON_GOING_POSITION_SUFFIX;
+        String positionString = 
managedLedger.getProperties().get(txnOnGoingPositionName);
+        if (positionString == null) {
+            this.currentLoadPosition = PositionImpl.earliest;
+        } else {
+            PositionImpl position = PositionImpl.earliest;
+            try {
+                position = 
PositionImpl.convertStringToPosition(positionString);
+            } catch (Exception e) {
+                log.error("Topic : [{}] transaction buffer get replay start 
position error!", topic.getName());
+            }
+            this.currentLoadPosition = position;
+        }
+        this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+        new Thread(() -> new TopicTransactionBufferReplayer(new 
TransactionBufferReplayCallback() {
+
+            @Override
+            public void replayComplete() {
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store 
change state error when replay complete");
+                }
+            }
+
+            @Override
+            public void handleMetadataEntry(Position position, MessageMetadata 
messageMetadata) {
+                if (!messageMetadata.hasTxnidMostBits() || 
!messageMetadata.hasTxnidLeastBits()) {
+                    return;
+                }
+                TxnID txnID = new TxnID(messageMetadata.getTxnidMostBits(),
+                        messageMetadata.getTxnidLeastBits());
+                if (isTxnMarker(messageMetadata)) {
+                    ConcurrentOpenHashSet<PositionImpl> positions = 
txnBufferCache.remove(txnID);
+                    positionsSort.removeAll(positions.values());
+                } else {
+                    ConcurrentOpenHashSet<PositionImpl> positions =
+                            txnBufferCache.computeIfAbsent(txnID, (v) -> new 
ConcurrentOpenHashSet<>());
+                    positions.add((PositionImpl) position);
+                    positionsSort.add((PositionImpl) position);
+                }
+            }
+        }).start()).start();

Review comment:
       it is good idea, i will change it. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to