lhotari commented on code in PR #25975:
URL: https://github.com/apache/pulsar/pull/25975#discussion_r3375303621


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java:
##########
@@ -140,6 +148,43 @@ public MetadataTransactionBuffer(PersistentTopic topic, 
TxnMetadataStore txnStor
         this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.maxReadPosition = ledger.getLastConfirmedEntry();
         recover();
+        this.abortedGcTask = scheduleAbortedGc();
+    }
+
+    /**
+     * Schedule the periodic aborted-record GC on the broker executor. Returns 
{@code null} when no
+     * executor is reachable (e.g. a unit test with a mocked topic); such 
callers drive
+     * {@link #pruneTrimmedAbortedTxns()} directly.
+     */
+    private ScheduledFuture<?> scheduleAbortedGc() {
+        ScheduledExecutorService executor = brokerExecutor();
+        if (executor == null) {
+            return null;
+        }
+        long intervalSeconds = Math.max(1, 
topic.getBrokerService().getPulsar().getConfiguration()
+                .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+        long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+        return executor.scheduleWithFixedDelay(() -> {
+            if (closed) {
+                return;
+            }
+            pruneTrimmedAbortedTxns().exceptionally(ex -> {
+                log.warn().attr("segment", segmentName).exception(ex)
+                        .log("Aborted-txn GC sweep failed; will retry next 
cycle");
+                return null;
+            });

Review Comment:
   Since `pruneTrimmedAbortedTxns` is an async task, it's possible that a 
previous task is running when a new one gets triggered. It might be useful to 
handle the case by either skipping the task that gets triggered while the 
previous one is executing or run `pruneTrimmedAbortedTxns` immediately after 
the previous call completes.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java:
##########
@@ -140,6 +148,43 @@ public MetadataTransactionBuffer(PersistentTopic topic, 
TxnMetadataStore txnStor
         this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.maxReadPosition = ledger.getLastConfirmedEntry();
         recover();
+        this.abortedGcTask = scheduleAbortedGc();
+    }
+
+    /**
+     * Schedule the periodic aborted-record GC on the broker executor. Returns 
{@code null} when no
+     * executor is reachable (e.g. a unit test with a mocked topic); such 
callers drive
+     * {@link #pruneTrimmedAbortedTxns()} directly.
+     */
+    private ScheduledFuture<?> scheduleAbortedGc() {
+        ScheduledExecutorService executor = brokerExecutor();
+        if (executor == null) {
+            return null;
+        }
+        long intervalSeconds = Math.max(1, 
topic.getBrokerService().getPulsar().getConfiguration()
+                .getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+        long intervalMs = TimeUnit.SECONDS.toMillis(intervalSeconds);
+        return executor.scheduleWithFixedDelay(() -> {

Review Comment:
   It could be useful to wrap the Runnable with 
`org.apache.pulsar.common.util.Runnables#catchingAndLoggingThrowables` so that 
an unexpected RuntimeException doesn't discontinue the scheduling (since 
`scheduleWithFixedDelay` will stop if any task throws an exception). It's not 
expected that exceptions would be thrown, but logging would help if they happen 
and Runnables.catchingAndLoggingThrowables will keep the scheduler running. 
(exceptions would get logged by [`DefaultUncaughtExceptionHandler` set in 
PulsarBrokerStart](https://github.com/apache/pulsar/blob/4998cd9e4f66ade6a7972da4afcd0f1546ddfc16/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java#L335-L340)
 in any case, so exceptions aren't silently swallowed without 
`catchingAndLoggingThrowables`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to