lhotari commented on code in PR #25884:
URL: https://github.com/apache/pulsar/pull/25884#discussion_r3317185317


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java:
##########
@@ -232,19 +301,159 @@ public void onCompleted() {
             }
         }).thenCompose(__ -> {
             TxnEvent event = new TxnEvent(txnIdKey, decision);
-            CompletableFuture<?>[] publishes = new CompletableFuture<?>[
-                    writeSegments.size() + ackParticipants.size()];
-            int i = 0;
+            List<CompletableFuture<Void>> publishes = new ArrayList<>(
+                    writeSegments.size() + ackParticipants.size());
             for (String segment : writeSegments) {
-                publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+                publishes.add(txnStore.publishSegmentEvent(segment, 
event).thenApply(s -> null));
             }
             for (AckParticipant p : ackParticipants) {
-                publishes[i++] = 
txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
+                publishes.add(txnStore.publishSubscriptionEvent(p.segment(), 
p.subscription(), event)
+                        .thenApply(s -> null));
             }
-            return CompletableFuture.allOf(publishes);
+            return FutureUtil.waitForAll(publishes);
         });
     }
 
+    // ---- Sweeps -----------------------------------------------------------
+
+    /**
+     * Abort transactions whose deadline has passed. Scans the by-deadline 
index up to "now" and
+     * drives each through {@link #endTransaction} with {@code ABORT}, which 
re-reads and CAS-guards
+     * the header — so a txn the client commits in the same window is left 
alone (the CAS loses and
+     * the resulting InvalidTxnStatus / BadVersion is treated as a benign 
race).
+     */
+    CompletableFuture<Void> sweepTimeouts() {
+        return ifElectedSweeper(() -> {
+            long now = System.currentTimeMillis();
+            List<TxnID> expired = Collections.synchronizedList(new 
ArrayList<>());
+            return txnStore.listOpenByDeadlineRange(null, now, new 
ScanConsumer() {
+                @Override
+                public void onNext(GetResult r) {
+                    String txnIdKey = 
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+                    if (txnIdKey != null) {
+                        expired.add(TxnIds.fromKey(txnIdKey));
+                    }
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    log.warn().exception(throwable).log("Timeout-sweep 
deadline scan errored");
+                }
+
+                @Override
+                public void onCompleted() {
+                }
+            }).thenCompose(__ -> {
+                List<CompletableFuture<Void>> aborts = new 
ArrayList<>(expired.size());
+                for (TxnID txnId : expired) {
+                    aborts.add(endTransaction(txnId, TxnAction.ABORT_VALUE)
+                            .exceptionally(ex -> {
+                                // Benign: the client may have 
committed/aborted it between the scan
+                                // and our CAS, or another sweeper got there 
first.
+                                log.debug().attr("txnId", txnId).exception(ex)
+                                        .log("Timeout-sweep abort skipped");
+                                return null;
+                            }));
+                }
+                return FutureUtil.waitForAll(aborts);
+            });
+        });
+    }
+
+    /**
+     * Garbage-collect finalized transactions whose retention window has 
elapsed. For each terminal
+     * state, scans the by-final-state index up to {@code now - retention} and 
applies
+     * {@link #gcOneTxn}.
+     */
+    CompletableFuture<Void> sweepGc() {
+        return ifElectedSweeper(() -> {
+            long cutoff = System.currentTimeMillis() - gcRetentionMs;
+            return gcFinalized(TxnState.COMMITTED, cutoff)
+                    .thenCompose(__ -> gcFinalized(TxnState.ABORTED, cutoff));
+        });
+    }
+
+    private CompletableFuture<Void> gcFinalized(TxnState state, long cutoffMs) 
{
+        List<Versioned<String>> candidates = Collections.synchronizedList(new 
ArrayList<>());
+        return txnStore.listFinalizedByStateAndTimeRange(state, null, 
cutoffMs, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                String txnIdKey = 
TxnPaths.txnIdFromHeaderPath(r.getStat().getPath());
+                if (txnIdKey != null) {
+                    candidates.add(new Versioned<>(txnIdKey, 
r.getStat().getVersion()));
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                log.warn().attr("state", 
state).exception(throwable).log("GC-sweep scan errored");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            List<CompletableFuture<Void>> gcs = new 
ArrayList<>(candidates.size());
+            for (Versioned<String> c : candidates) {
+                gcs.add(gcOneTxn(c.value(), c.version(), state));
+            }
+            return FutureUtil.waitForAll(gcs);
+        });
+    }
+
+    /**
+     * GC one finalized txn. If it still has {@code /txn/op} records, some 
participant hasn't applied
+     * the outcome yet — or never received the event (e.g. the TC crashed 
between the header CAS and
+     * the fan-out). Re-drive the fan-out and leave the header in place so the 
participant re-reads
+     * the true outcome; it removes its op records once it applies them, and a 
later GC pass — seeing
+     * no op records — deletes the header. We never delete a header while a 
participant might still
+     * re-read it, so a committed txn's data is never stranded as "unknown".
+     */
+    private CompletableFuture<Void> gcOneTxn(String txnIdKey, long version, 
TxnState state) {
+        TxnID txnId = TxnIds.fromKey(txnIdKey);
+        boolean[] hasOps = {false};
+        return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                hasOps[0] = true;
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // Treat a scan error as "ops may exist" — safer to retry the 
repair than to delete.
+                hasOps[0] = true;
+                log.warn().attr("txnId", 
txnId).exception(throwable).log("GC-sweep op scan errored");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            if (hasOps[0]) {
+                return fanOutEvents(txnId, txnIdKey, state);
+            }
+            return txnStore.deleteHeader(txnIdKey, version).exceptionally(ex 
-> {
+                // Benign: header changed or was already deleted since the 
scan.
+                log.debug().attr("txnId", txnId).exception(ex).log("GC-sweep 
header delete skipped");
+                return null;
+            });
+        });
+    }
+
+    /**
+     * Run {@code action} only on the elected sweeper — the broker that owns 
partition 0 of
+     * {@code transaction_coordinator_assign}. Not owning it (or any error 
checking ownership) means
+     * "skip this cycle". Correctness doesn't depend on the election: every 
transition is a header
+     * CAS, so a stale owner sweeping concurrently is harmless.
+     */
+    private CompletableFuture<Void> 
ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {

Review Comment:
   is there a need to add a check for `closed` so that operations don't 
continue after shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to