gaoran10 commented on a change in pull request #9490:
URL: https://github.com/apache/pulsar/pull/9490#discussion_r573065541



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class TransactionBufferSystemTopicClient extends 
SystemTopicClientBase<TransactionBufferSnapshot> {
+    private TransactionBufferSnapshotService transactionBufferSnapshotService;
+
+    public TransactionBufferSystemTopicClient(PulsarClient client, TopicName 
topicName,
+                                              TransactionBufferSnapshotService 
transactionBufferSnapshotService) {
+        super(client, topicName);
+        this.transactionBufferSnapshotService = 
transactionBufferSnapshotService;
+    }
+
+    @Override
+    protected CompletableFuture<Writer<TransactionBufferSnapshot>> 
newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(TransactionBufferSnapshot.class))
+                .topic(topicName.toString())
+                .createAsync().thenCompose(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new transactionBufferSnapshot writer 
is created", topicName);
+                    }
+                    return CompletableFuture.completedFuture(
+                            new TransactionBufferSnapshotWriter(producer, 
this));
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<TransactionBufferSnapshot>> 
newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.latest)

Review comment:
       Maybe the start message-id is `MessageId.earliest`?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +371,190 @@ public void 
syncMaxReadPositionForNormalPublish(PositionImpl position) {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return this.maxReadPosition;
+        if (checkIfReady()) {
+            return this.maxReadPosition;
+        } else {
+            return PositionImpl.earliest;
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (checkIfReady()) {
+            takeSnapshotByTimeout();
+            this.timer.newTimeout(this, takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    static class TopicTransactionBufferRecover implements Runnable {
+
+        private final PersistentTopic topic;
+
+        private final TopicTransactionBufferRecoverCallBack callBack;
+
+        private Position startReadCursorPosition = PositionImpl.earliest;
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final AtomicLong exceptionNumber = new AtomicLong();
+
+        // TODO: MAX_EXCEPTION_NUMBER can config
+        private static final int MAX_EXCEPTION_NUMBER = 500;
+
+        public static final String SUBSCRIPTION_NAME = 
"transaction-buffer-sub";
+
+        private final TopicTransactionBuffer topicTransactionBuffer;
+
+        private 
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, 
PersistentTopic topic,
+                                              TopicTransactionBuffer 
transactionBuffer) {
+            this.topic = topic;
+            this.callBack = callBack;
+            this.entryQueue = new SpscArrayQueue<>(2000);
+            this.topicTransactionBuffer = transactionBuffer;
+        }
+
+        @SneakyThrows
+        @Override
+        public void run() {
+            this.topicTransactionBuffer.changeToInitializingState();
+            
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
+                    
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
+                try {
+                    while (reader.hasMoreEvents()) {
+                        Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                        TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
+                        if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
+                            callBack.handleSnapshot(transactionBufferSnapshot);
+                            this.startReadCursorPosition = PositionImpl.get(
+                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                        }
+                    }
+                } catch (PulsarClientException pulsarClientException) {
+                    log.error("[{}]Transaction buffer recover fail when read "
+                            + "transactionBufferSnapshot!", topic.getName(), 
pulsarClientException);
+                    reader.closeAsync().exceptionally(e -> {
+                        log.error("[{}]Transaction buffer reader close 
error!", topic.getName(), e);
+                        return null;
+                    });
+                    return;
+                }
+                reader.closeAsync().exceptionally(e -> {
+                    log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+                    return null;
+                });
+
+                ManagedCursor managedCursor;
+                try {
+                    managedCursor = topic.getManagedLedger()
+                            .newNonDurableCursor(this.startReadCursorPosition, 
SUBSCRIPTION_NAME);
+                } catch (ManagedLedgerException e) {
+                    log.error("[{}]Transaction buffer recover fail when open 
cursor!", topic.getName(), e);
+                    return;
+                }
+                PositionImpl lastConfirmedEntry = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+                PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
+                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue, managedCursor,
+                        TopicTransactionBufferRecover.this);
+                if (lastConfirmedEntry.getEntryId() != -1) {
+                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 
0) {
+                        fillEntryQueueCallback.fillQueue();
+                        Entry entry = entryQueue.poll();
+                        if (entry != null) {
+                            try {
+                                currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
+                                callBack.handleTxnEntry(entry);
+                            } finally {
+                                entry.release();
+                            }
+                        } else {
+                            if (exceptionNumber.get() > MAX_EXCEPTION_NUMBER) {
+                                log.error("[{}]Transaction buffer recover fail 
when "
+                                        + "replay message error number > {}!", 
topic.getName(), MAX_EXCEPTION_NUMBER);
+                                closeCursor(managedCursor);
+                                return;
+                            }
+                            try {
+                                Thread.sleep(1);
+                            } catch (InterruptedException e) {
+                                //no-op
+                            }
+                        }
+                    }
+                }
+
+                closeCursor(managedCursor);
+                callBack.replayComplete();
+            }).exceptionally(e -> {
+                log.error("[{}]Transaction buffer new snapshot reader fail!", 
topic.getName(), e);
+                return null;
+            });
+        }
+
+        private void closeCursor(ManagedCursor cursor) {
+            cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+                @Override
+                public void closeComplete(Object ctx) {
+                    log.info("[{}]Transaction buffer snapshot recover cursor 
close complete.", topic.getName());
+                }
+
+                @Override
+                public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
+                    log.error("[{}]Transaction buffer snapshot recover cursor 
close fail.", topic.getName());
+                }
+            }, null);
+        }
+
+        private void callBackException(ManagedLedgerException e) {
+            log.error("Transaction buffer recover fail when recover 
transaction entry!", e);
+            this.exceptionNumber.getAndIncrement();
+        }
+    }
+
+    static class FillEntryQueueCallback implements 
AsyncCallbacks.ReadEntriesCallback {
+
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final ManagedCursor cursor;
+
+        private final TopicTransactionBufferRecover recover;
+
+        private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, 
ManagedCursor cursor,
+                                       TopicTransactionBufferRecover recover) {
+            this.entryQueue = entryQueue;
+            this.cursor = cursor;
+            this.recover = recover;
+        }
+        void fillQueue() {
+            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {

Review comment:
       Maybe we could wait for the entry queue size to a threshold value then 
read more entries to fill the entry queue.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.bookkeeper.mledger.Entry;
+import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+
+public interface TopicTransactionBufferRecoverCallBack {
+
+    /**
+     * Topic transaction buffer recover complete.
+     */
+    void replayComplete();
+
+    /**
+     * Handle transactionBufferSnapshot.
+     *
+     * @param snapshot the transaction buffer snapshot
+     */
+    void handleSnapshot(TransactionBufferSnapshot snapshot);
+
+    /**
+     * Handle transaction entry.

Review comment:
       ```suggestion
        * Handle transaction entry beyond the snapshot.
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +371,190 @@ public void 
syncMaxReadPositionForNormalPublish(PositionImpl position) {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return this.maxReadPosition;
+        if (checkIfReady()) {
+            return this.maxReadPosition;
+        } else {
+            return PositionImpl.earliest;
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (checkIfReady()) {
+            takeSnapshotByTimeout();
+            this.timer.newTimeout(this, takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    static class TopicTransactionBufferRecover implements Runnable {

Review comment:
       Could you add a simple description for the recovery flow?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -83,12 +164,31 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleTransactionMessage(TxnID txnId, Position position) {
+        if (!ongoingTxns.containsKey(txnId)) {
+            ongoingTxns.put(txnId, (PositionImpl) position);
+            PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());
+            //max read position is less than first ongoing transaction message 
position, so entryId -1
+            maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), 
firstPosition.getEntryId() - 1);
+        }
+    }
+
+
     @Override
     public CompletableFuture<TransactionBufferReader> 
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
         return null;
     }
     @Override
     public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
+        if (!checkIfReady()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}]Transaction buffer not recover complete!", 
topic.getName());
+            }
+            return FutureUtil.failedFuture(
+                    new ServiceUnitNotReadyException("[{" + topic.getName()
+                            + "}]Transaction buffer not recover complete!"));
+        }
+

Review comment:
       How about making these codes as a method? I found three points use this 
check.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.bookkeeper.mledger.Entry;
+import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+
+public interface TopicTransactionBufferRecoverCallBack {
+
+    /**
+     * Topic transaction buffer recover complete.
+     */
+    void replayComplete();

Review comment:
       How about changing the method name to `recoverComplete`?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +371,190 @@ public void 
syncMaxReadPositionForNormalPublish(PositionImpl position) {
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        return this.maxReadPosition;
+        if (checkIfReady()) {
+            return this.maxReadPosition;
+        } else {
+            return PositionImpl.earliest;
+        }
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (checkIfReady()) {
+            takeSnapshotByTimeout();
+            this.timer.newTimeout(this, takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    static class TopicTransactionBufferRecover implements Runnable {
+
+        private final PersistentTopic topic;
+
+        private final TopicTransactionBufferRecoverCallBack callBack;
+
+        private Position startReadCursorPosition = PositionImpl.earliest;
+
+        private final SpscArrayQueue<Entry> entryQueue;
+
+        private final AtomicLong exceptionNumber = new AtomicLong();
+
+        // TODO: MAX_EXCEPTION_NUMBER can config
+        private static final int MAX_EXCEPTION_NUMBER = 500;
+
+        public static final String SUBSCRIPTION_NAME = 
"transaction-buffer-sub";
+
+        private final TopicTransactionBuffer topicTransactionBuffer;
+
+        private 
TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, 
PersistentTopic topic,
+                                              TopicTransactionBuffer 
transactionBuffer) {
+            this.topic = topic;
+            this.callBack = callBack;
+            this.entryQueue = new SpscArrayQueue<>(2000);
+            this.topicTransactionBuffer = transactionBuffer;
+        }
+
+        @SneakyThrows
+        @Override
+        public void run() {
+            this.topicTransactionBuffer.changeToInitializingState();
+            
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
+                    
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
+                try {
+                    while (reader.hasMoreEvents()) {
+                        Message<TransactionBufferSnapshot> message = 
reader.readNext();
+                        TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
+                        if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
+                            callBack.handleSnapshot(transactionBufferSnapshot);
+                            this.startReadCursorPosition = PositionImpl.get(
+                                    
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                    
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                        }
+                    }
+                } catch (PulsarClientException pulsarClientException) {
+                    log.error("[{}]Transaction buffer recover fail when read "
+                            + "transactionBufferSnapshot!", topic.getName(), 
pulsarClientException);
+                    reader.closeAsync().exceptionally(e -> {
+                        log.error("[{}]Transaction buffer reader close 
error!", topic.getName(), e);
+                        return null;
+                    });
+                    return;
+                }
+                reader.closeAsync().exceptionally(e -> {
+                    log.error("[{}]Transaction buffer reader close error!", 
topic.getName(), e);
+                    return null;
+                });
+
+                ManagedCursor managedCursor;
+                try {
+                    managedCursor = topic.getManagedLedger()
+                            .newNonDurableCursor(this.startReadCursorPosition, 
SUBSCRIPTION_NAME);
+                } catch (ManagedLedgerException e) {
+                    log.error("[{}]Transaction buffer recover fail when open 
cursor!", topic.getName(), e);
+                    return;
+                }
+                PositionImpl lastConfirmedEntry = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
+                PositionImpl currentLoadPosition = (PositionImpl) 
this.startReadCursorPosition;
+                FillEntryQueueCallback fillEntryQueueCallback = new 
FillEntryQueueCallback(entryQueue, managedCursor,
+                        TopicTransactionBufferRecover.this);
+                if (lastConfirmedEntry.getEntryId() != -1) {
+                    while (lastConfirmedEntry.compareTo(currentLoadPosition) > 
0) {
+                        fillEntryQueueCallback.fillQueue();
+                        Entry entry = entryQueue.poll();
+                        if (entry != null) {
+                            try {
+                                currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
+                                callBack.handleTxnEntry(entry);
+                            } finally {
+                                entry.release();
+                            }
+                        } else {
+                            if (exceptionNumber.get() > MAX_EXCEPTION_NUMBER) {
+                                log.error("[{}]Transaction buffer recover fail 
when "

Review comment:
       Why the entry could be null?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -168,7 +280,55 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         }
     }
 
+    private synchronized void takeSnapshotByChangeTimes() {
+        if (changeMaxReadPositionAndAddAbortTimes.get() >= 
takeSnapshotIntervalNumber) {
+            takeSnapshot();
+        }
+    }
+
+    private synchronized void takeSnapshotByTimeout() {
+        if (changeMaxReadPositionAndAddAbortTimes.get() > 0) {
+            takeSnapshot();
+        }
+        timer.newTimeout(TopicTransactionBuffer.this,
+                takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+    }
+
+    private void takeSnapshot() {
+        changeMaxReadPositionAndAddAbortTimes.set(0);
+        takeSnapshotWriter.thenAccept(writer -> {
+            TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
+            snapshot.setTopicName(topic.getName());
+            snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId());
+            snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId());
+            List<AbortTxnMetadata> list = new ArrayList<>();
+            aborts.forEach((k, v) -> {
+                AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata();
+                abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits());
+                abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits());
+                abortTxnMetadata.setLedgerId(v.getLedgerId());
+                abortTxnMetadata.setEntryId(v.getEntryId());
+                list.add(abortTxnMetadata);
+            });
+            snapshot.setAborts(list);
+            writer.writeAsync(snapshot).thenAccept((messageId) -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}]Transaction buffer take snapshot success! "
+                            + "messageId : {}", topic.getName(), messageId);
+                }
+            }).exceptionally(e -> {
+                if (log.isDebugEnabled()) {

Review comment:
       Maybe we could make a warn-level log? Because if many snapshot 
operations are failed, the recovery time will be too long, the warn-level log 
could help users find out why the recovery cost such time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to