congbobo184 commented on a change in pull request #9490:
URL: https://github.com/apache/pulsar/pull/9490#discussion_r575820287



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +371,190 @@ public void 
syncMaxReadPositionForNormalPublish(PositionImpl position) {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return this.maxReadPosition;
+        if (checkIfReady()) {
+            return this.maxReadPosition;
+        } else {
+            return PositionImpl.earliest;
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (checkIfReady()) {
+            takeSnapshotByTimeout();
+            this.timer.newTimeout(this, takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    static class TopicTransactionBufferRecover implements Runnable {
+
+        private final PersistentTopic topic;
+
+        private final TopicTransactionBufferRecoverCallBack callBack;
+
+        private Position startReadCursorPosition = PositionImpl.earliest;
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final AtomicLong exceptionNumber = new AtomicLong();
+
+        // TODO: MAX_EXCEPTION_NUMBER can config
+        private static final int MAX_EXCEPTION_NUMBER = 500;
+
+        public static final String SUBSCRIPTION_NAME = 
"transaction-buffer-sub";
+
+        private final TopicTransactionBuffer topicTransactionBuffer;
+
+        private 
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, 
PersistentTopic topic,
+                                              TopicTransactionBuffer 
transactionBuffer) {
+            this.topic = topic;
+            this.callBack = callBack;
+            this.entryQueue = new SpscArrayQueue<>(2000);
+            this.topicTransactionBuffer = transactionBuffer;
+        }
+
+        @SneakyThrows
+        @Override
+        public void run() {
+            this.topicTransactionBuffer.changeToInitializingState();
+            
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
+                    
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
+                try {
+                    while (reader.hasMoreEvents()) {
+                        Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                        TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
+                        if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
+                            callBack.handleSnapshot(transactionBufferSnapshot);
+                            this.startReadCursorPosition = PositionImpl.get(
+                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                        }
+                    }
+                } catch (PulsarClientException pulsarClientException) {
+                    log.error("[{}]Transaction buffer recover fail when read "
+                            + "transactionBufferSnapshot!", topic.getName(), 
pulsarClientException);
+                    reader.closeAsync().exceptionally(e -> {
+                        log.error("[{}]Transaction buffer reader close 
error!", topic.getName(), e);
+                        return null;
+                    });
+                    return;
+                }
+                reader.closeAsync().exceptionally(e -> {
+                    log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+                    return null;
+                });
+
+                ManagedCursor managedCursor;
+                try {
+                    managedCursor = topic.getManagedLedger()
+                            .newNonDurableCursor(this.startReadCursorPosition, 
SUBSCRIPTION_NAME);
+                } catch (ManagedLedgerException e) {
+                    log.error("[{}]Transaction buffer recover fail when open 
cursor!", topic.getName(), e);
+                    return;
+                }
+                PositionImpl lastConfirmedEntry = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+                PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
+                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue, managedCursor,
+                        TopicTransactionBufferRecover.this);
+                if (lastConfirmedEntry.getEntryId() != -1) {
+                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 
0) {
+                        fillEntryQueueCallback.fillQueue();
+                        Entry entry = entryQueue.poll();
+                        if (entry != null) {
+                            try {
+                                currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
+                                callBack.handleTxnEntry(entry);
+                            } finally {
+                                entry.release();
+                            }
+                        } else {
+                            if (exceptionNumber.get() > MAX_EXCEPTION_NUMBER) {
+                                log.error("[{}]Transaction buffer recover fail 
when "
+                                        + "replay message error number > {}!", 
topic.getName(), MAX_EXCEPTION_NUMBER);
+                                closeCursor(managedCursor);
+                                return;
+                            }
+                            try {
+                                Thread.sleep(1);
+                            } catch (InterruptedException e) {
+                                //no-op
+                            }
+                        }
+                    }
+                }
+
+                closeCursor(managedCursor);
+                callBack.replayComplete();
+            }).exceptionally(e -> {
+                log.error("[{}]Transaction buffer new snapshot reader fail!", 
topic.getName(), e);
+                return null;
+            });
+        }
+
+        private void closeCursor(ManagedCursor cursor) {
+            cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+                @Override
+                public void closeComplete(Object ctx) {
+                    log.info("[{}]Transaction buffer snapshot recover cursor 
close complete.", topic.getName());
+                }
+
+                @Override
+                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                    log.error("[{}]Transaction buffer snapshot recover cursor 
close fail.", topic.getName());
+                }
+            }, null);
+        }
+
+        private void callBackException(ManagedLedgerException e) {
+            log.error("Transaction buffer recover fail when recover 
transaction entry!", e);
+            this.exceptionNumber.getAndIncrement();
+        }
+    }
+
+    static class FillEntryQueueCallback implements 
AsyncCallbacks.ReadEntriesCallback {
+
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final ManagedCursor cursor;
+
+        private final TopicTransactionBufferRecover recover;
+
+        private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, 
ManagedCursor cursor,
+                                       TopicTransactionBufferRecover recover) {
+            this.entryQueue = entryQueue;
+            this.cursor = cursor;
+            this.recover = recover;
+        }
+        void fillQueue() {
+            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {

Review comment:
       if do with this method, we should make sure there is only one read 
operation outside.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to