codelipenghui commented on a change in pull request #9490:
URL: https://github.com/apache/pulsar/pull/9490#discussion_r580278208



##########
File path: conf/broker.conf
##########
@@ -1203,6 +1203,13 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 transactionCoordinatorEnabled=false
 
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
 
+# Transaction buffer take snapshot interval number
+transactionBufferTakeSnapshotIntervalNumber=1000
+
+# Transaction buffer take snapshot interval time
+# Unit : millisecond
+transactionBufferTakeSnapshotIntervalTime=5000

Review comment:
       ```suggestion
   transactionBufferSnapshotMinTimeInMills=5000
   ```

##########
File path: conf/broker.conf
##########
@@ -1203,6 +1203,13 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 transactionCoordinatorEnabled=false
 
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
 
+# Transaction buffer take snapshot interval number
+transactionBufferTakeSnapshotIntervalNumber=1000

Review comment:
       The number is the transactions count or entry count?  And it's better to 
use `max` as the prefix?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -118,6 +219,15 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
 
     @Override
     public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
+        if (!checkIfReady()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}]Transaction buffer not recover complete!", 
topic.getName());
+            }
+            return FutureUtil.failedFuture(
+                    new ServiceUnitNotReadyException("[{" + topic.getName()

Review comment:
       If return ServiceUntiNotReadException, the client will redo the lookup. 
It should be a transaction exception

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -168,7 +280,53 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         }
     }
 
+    private synchronized void takeSnapshotByChangeTimes() {
+        if (changeMaxReadPositionAndAddAbortTimes.get() >= 
takeSnapshotIntervalNumber) {
+            takeSnapshot();

Review comment:
       takeSnapshot is an async method. we need to avoid concurrent snapshots. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +369,192 @@ public void 
syncMaxReadPositionForNormalPublish(PositionImpl position) {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return this.maxReadPosition;
+        if (checkIfReady()) {
+            return this.maxReadPosition;
+        } else {
+            return PositionImpl.earliest;
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (checkIfReady()) {
+            takeSnapshotByTimeout();
+            this.timer.newTimeout(this, takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    // we store the maxReadPosition from snapshot then open the non-durable 
cursor by this topic's manageLedger.
+    // the non-durable cursor will read to lastConfirmedEntry.
+    static class TopicTransactionBufferRecover implements Runnable {
+
+        private final PersistentTopic topic;
+
+        private final TopicTransactionBufferRecoverCallBack callBack;
+
+        private Position startReadCursorPosition = PositionImpl.earliest;
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final AtomicLong exceptionNumber = new AtomicLong();
+
+        // TODO: MAX_EXCEPTION_NUMBER can config
+        private static final int MAX_EXCEPTION_NUMBER = 500;

Review comment:
       What the expected behavior if the exceptions < MAX_EXCEPTION_NUMBER? In 
this case, if the transaction buffer recovered, does the transaction buffer 
will lose some transaction state?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to