eolivelli commented on a change in pull request #9490:
URL: https://github.com/apache/pulsar/pull/9490#discussion_r570998498
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -48,8 +68,71 @@
private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>();
+ private final
CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>>
takeSnapshotWriter;
+
+ // when add abort or change max read position, the count will +1. Take
snapshot will set 0 into it.
+ private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new
AtomicLong();
+
+ private final Timer timer;
+
+ private final int takeSnapshotIntervalNumber;
+
+ private final int takeSnapshotIntervalTime;
+
public TopicTransactionBuffer(PersistentTopic topic) {
+ super(State.None);
this.topic = topic;
+ this.changeToInitializingState();
Review comment:
what about moving this initialisation procedure in a separate method ?
executing non trivial code in the constructor is problematic because in
case of problems you are not able to dispose correctly the object, because the
caller does not receive a reference to the new object but we are still leaking
references to the object.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2529,6 +2534,19 @@ public void publishTxnMessage(TxnID txnID, ByteBuf
headersAndPayload, PublishCon
return;
}
+ // need to check transaction buffer ready before before check the
message deduplication
+ if (transactionBuffer instanceof TopicTransactionBuffer) {
+ if (!((TopicTransactionBuffer) transactionBuffer).checkIfReady()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}]Transaction buffer not recover complete!",
topic);
+ }
+ publishContext.completed(new BrokerServiceException
+ .ServiceUnitNotReadyException("[{" + topic +
"}]Transaction buffer not "
Review comment:
what about: "Transaction buffer recovery is still running" ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -168,7 +277,53 @@ public void addFailed(ManagedLedgerException exception,
Object ctx) {
}
}
+ private synchronized void takeSnapshotBuyChangeTimes() {
Review comment:
typo: "buy" -> "by"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]