liangyepianzhou commented on code in PR #17847:
URL: https://github.com/apache/pulsar/pull/17847#discussion_r1008072382


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -625,108 +520,76 @@ public static class TopicTransactionBufferRecover 
implements Runnable {
 
         private final TopicTransactionBuffer topicTransactionBuffer;
 
-        private final 
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter;
+        private final AbortedTxnProcessor abortedTxnProcessor;
 
         private 
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, 
PersistentTopic topic,
-                                              TopicTransactionBuffer 
transactionBuffer, CompletableFuture<
-                SystemTopicClient.Writer<TransactionBufferSnapshot>> 
takeSnapshotWriter) {
+                                              TopicTransactionBuffer 
transactionBuffer,
+                                              AbortedTxnProcessor 
abortedTxnProcessor) {
             this.topic = topic;
             this.callBack = callBack;
             this.entryQueue = new SpscArrayQueue<>(2000);
             this.topicTransactionBuffer = transactionBuffer;
-            this.takeSnapshotWriter = takeSnapshotWriter;
+            this.abortedTxnProcessor = abortedTxnProcessor;
         }
 
         @SneakyThrows
         @Override
         public void run() {
-            this.takeSnapshotWriter.thenRunAsync(() -> {
-                if (!this.topicTransactionBuffer.changeToInitializingState()) {
-                    log.warn("TransactionBuffer {} of topic {} can not change 
state to Initializing",
-                            this, topic.getName());
+            if (!this.topicTransactionBuffer.changeToInitializingState()) {
+                log.warn("TransactionBuffer {} of topic {} can not change 
state to Initializing",
+                        this, topic.getName());
+                return;
+            }
+            
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
 -> {
+                //Transaction is not enable for this topic, so just make 
maxReadPosition as LAC.

Review Comment:
   Yes, the logic of recovery is actually that. What is wrong?
   ```
       
abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition
 -> {
           //Transaction is not enable for this topic, so just make 
maxReadPosition as LAC.
           if (startReadCursorPosition == null) {
               callBack.noNeedToRecover();
               return;
           } else {
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to