merlimat commented on code in PR #25668:
URL: https://github.com/apache/pulsar/pull/25668#discussion_r3191876718
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
+ :
CompletableFuture.completedFuture(null))
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate /
prune segment;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return CompletableFuture.allOf(
+ perSegment.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
Review Comment:
> Does this currently handle queue consumers too? Is it possible that a
child segment gets pruned before one of it's parent segments?
Yes, since it checks the subscription, which is the same between queuing and
streaming.
> Is it possible that a child segment gets pruned while a checkpoint
consumer is actively consuming one of the parent segments?
Good point. Yes, the segment can definitely be deleting while the checkpoint
consumer is reading, since it doesn't imply any retention. The client should be
able to recover and skip the now deleted segment. I'll add in a separate PR
since it imply more changes in client side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]