lhotari commented on code in PR #25975:
URL: https://github.com/apache/pulsar/pull/25975#discussion_r3375303621
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java:
##########
@@ -140,6 +148,43 @@ public MetadataTransactionBuffer(PersistentTopic topic,
TxnMetadataStore txnStor
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.maxReadPosition = ledger.getLastConfirmedEntry();
recover();
+ this.abortedGcTask = scheduleAbortedGc();
+ }
+
+ /**
+ * Schedule the periodic aborted-record GC on the broker executor. Returns
{@code null} when no
+ * executor is reachable (e.g. a unit test with a mocked topic); such
callers drive
+ * {@link #pruneTrimmedAbortedTxns()} directly.
+ */
+ private ScheduledFuture<?> scheduleAbortedGc() {
+ ScheduledExecutorService executor = brokerExecutor();
+ if (executor == null) {
+ return null;
+ }
+ long intervalSeconds = Math.max(1,
topic.getBrokerService().getPulsar().getConfiguration()
+ .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+ long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+ return executor.scheduleWithFixedDelay(() -> {
+ if (closed) {
+ return;
+ }
+ pruneTrimmedAbortedTxns().exceptionally(ex -> {
+ log.warn().attr("segment", segmentName).exception(ex)
+ .log("Aborted-txn GC sweep failed; will retry next
cycle");
+ return null;
+ });
Review Comment:
Since `pruneTrimmedAbortedTxns` is an async task, it's possible that a
previous task is running when a new one gets triggered. It might be useful to
handle the case by either skipping the task that gets triggered while the
previous one is executing or run `pruneTrimmedAbortedTxns` immediately after
the previous call completes.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java:
##########
@@ -140,6 +148,43 @@ public MetadataTransactionBuffer(PersistentTopic topic,
TxnMetadataStore txnStor
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.maxReadPosition = ledger.getLastConfirmedEntry();
recover();
+ this.abortedGcTask = scheduleAbortedGc();
+ }
+
+ /**
+ * Schedule the periodic aborted-record GC on the broker executor. Returns
{@code null} when no
+ * executor is reachable (e.g. a unit test with a mocked topic); such
callers drive
+ * {@link #pruneTrimmedAbortedTxns()} directly.
+ */
+ private ScheduledFuture<?> scheduleAbortedGc() {
+ ScheduledExecutorService executor = brokerExecutor();
+ if (executor == null) {
+ return null;
+ }
+ long intervalSeconds = Math.max(1,
topic.getBrokerService().getPulsar().getConfiguration()
+ .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+ long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+ return executor.scheduleWithFixedDelay(() -> {
Review Comment:
It could be useful to wrap the Runnable with
`org.apache.pulsar.common.util.Runnables#catchingAndLoggingThrowables` so that
an unexpected RuntimeException doesn't discontinue the scheduling (since
`scheduleWithFixedDelay` will stop if any task throws an exception). It's not
expected that exceptions would be thrown, but logging would help if they happen
and Runnables.catchingAndLoggingThrowables will keep the scheduler running.
(exceptions would get logged by [`DefaultUncaughtExceptionHandler` set in
PulsarBrokerStart](https://github.com/apache/pulsar/blob/4998cd9e4f66ade6a7972da4afcd0f1546ddfc16/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java#L335-L340)
in any case, so exceptions aren't silently swallowed without
`catchingAndLoggingThrowables`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]