congbobo184 commented on a change in pull request #9490:
URL: https://github.com/apache/pulsar/pull/9490#discussion_r580841429
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -207,6 +369,192 @@ public void
syncMaxReadPositionForNormalPublish(PositionImpl position) {
@Override
public PositionImpl getMaxReadPosition() {
- return this.maxReadPosition;
+ if (checkIfReady()) {
+ return this.maxReadPosition;
+ } else {
+ return PositionImpl.earliest;
+ }
+ }
+
+ @Override
+ public void run(Timeout timeout) {
+ if (checkIfReady()) {
+ takeSnapshotByTimeout();
+ this.timer.newTimeout(this, takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // we store the maxReadPosition from snapshot then open the non-durable
cursor by this topic's manageLedger.
+ // the non-durable cursor will read to lastConfirmedEntry.
+ static class TopicTransactionBufferRecover implements Runnable {
+
+ private final PersistentTopic topic;
+
+ private final TopicTransactionBufferRecoverCallBack callBack;
+
+ private Position startReadCursorPosition = PositionImpl.earliest;
+
+ private final SpscArrayQueue<Entry> entryQueue;
+
+ private final AtomicLong exceptionNumber = new AtomicLong();
+
+ // TODO: MAX_EXCEPTION_NUMBER can config
+ private static final int MAX_EXCEPTION_NUMBER = 500;
Review comment:
it won't lose transaction state, because when read fail the read
position will not change. it always read the correct position by the cursor.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]