merlimat commented on code in PR #25668:
URL: https://github.com/apache/pulsar/pull/25668#discussion_r3191862218


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public 
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
                 });
     }
 
+    // --- Sealed-segment GC ---
+
+    /**
+     * One iteration of the sealed-segment GC. For every sealed segment in the 
current
+     * layout whose retention window has expired, polls every known 
subscription's
+     * backlog on that segment; if all subscriptions are drained, prunes the 
segment
+     * from the DAG (CAS) and deletes its backing managed-ledger topic.
+     *
+     * <p>The retention window is resolved from topic-policies on the parent
+     * {@code topic://...} → namespace policy → broker default, the same 
precedence
+     * Pulsar uses for regular topics.
+     *
+     * <p>Visible for tests; in production it's invoked by the scheduled task.
+     */
+    CompletableFuture<Void> runGcTickAsync() {
+        if (!isLeader() || closed) {
+            return CompletableFuture.completedFuture(null);
+        }
+        final SegmentLayout layout = currentLayout;
+        if (layout == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        // Candidates: sealed segments past their retention horizon. We resolve
+        // retention once per tick — cheap, and avoids per-segment policy 
lookups.
+        return resolveRetentionMillisAsync()
+                .thenCompose(retentionMs -> {
+                    if (retentionMs == null) {
+                        // Negative / unset → retain forever. No GC this tick.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    long now = clock.millis();
+                    List<SegmentInfo> candidates = new ArrayList<>();
+                    for (SegmentInfo seg : layout.getAllSegments().values()) {
+                        if (seg.isSealed() && seg.sealedAtMs() > 0
+                                && (now - seg.sealedAtMs()) >= retentionMs) {
+                            candidates.add(seg);
+                        }
+                    }
+                    if (candidates.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return pruneEligibleAsync(candidates);
+                });
+    }
+
+    /**
+     * For each candidate sealed segment, check that every existing 
subscription has
+     * drained it (backlog == 0); prune the ones that pass. Failures on 
individual
+     * segments are logged and skipped — the next tick retries.
+     */
+    private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo> 
candidates) {
+        return resources.listSubscriptionsAsync(topicName)
+                .thenCompose(subs -> {
+                    List<CompletableFuture<Void>> perSegment = new 
ArrayList<>();
+                    for (SegmentInfo seg : candidates) {
+                        perSegment.add(prunable(seg, subs)
+                                .thenCompose(prunable -> prunable
+                                        ? pruneSegmentAsync(seg)
+                                        : 
CompletableFuture.completedFuture(null))
+                                .exceptionally(ex -> {
+                                    log.warn().attr("segmentId", 
seg.segmentId())
+                                            .exceptionMessage(ex)
+                                            .log("GC: failed to evaluate / 
prune segment;"
+                                                    + " will retry on next 
tick");
+                                    return null;
+                                }));
+                    }
+                    return CompletableFuture.allOf(
+                            perSegment.toArray(CompletableFuture[]::new));
+                });
+    }
+
+    private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String> 
subs) {
+        if (subs.isEmpty()) {
+            // No subscribers ever attached / all unsubscribed → nothing left 
to drain.
+            return CompletableFuture.completedFuture(true);
+        }
+        CompletableFuture<Boolean>[] checks = subs.stream()
+                .map(sub -> isSegmentDrained(seg, sub))
+                .toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(checks)
+                .thenApply(__ -> {
+                    for (CompletableFuture<Boolean> c : checks) {
+                        if (!c.join()) {
+                            return false;
+                        }
+                    }
+                    return true;
+                });
+    }
+
+    /**
+     * Apply a single segment's prune: CAS the layout to drop it, then delete 
the
+     * backing managed-ledger topic. Layout-prune is the point of no return — 
once it
+     * lands, no consumer will ever subscribe to the segment again. 
Backing-topic
+     * delete failures are logged and retried on subsequent ticks.
+     */
+    private CompletableFuture<Void> pruneSegmentAsync(SegmentInfo seg) {
+        long segmentId = seg.segmentId();
+        log.info().attr("segmentId", segmentId)
+                .attr("sealedAtMs", seg.sealedAtMs())
+                .log("GC: pruning sealed segment past retention");
+
+        return resources.updateScalableTopicAsync(topicName, md -> {
+            SegmentLayout latest = SegmentLayout.fromMetadata(md);
+            // Re-validate against the latest layout: another writer may have 
already
+            // pruned this segment, or replaced it via a follow-up split / 
merge. If
+            // it's gone, leave the metadata untouched (no-op CAS).
+            if (!latest.getAllSegments().containsKey(segmentId)) {
+                return md;
+            }
+            SegmentLayout updated = latest.pruneSegment(segmentId);
+            return updated.toMetadata(md.getProperties());
+        }).thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topicName, true))
+          .thenCompose(optMd -> {
+              currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+              return notifySubscriptions(currentLayout);
+          })
+          .thenCompose(__ -> deleteSegmentBackingTopic(seg))
+          .thenAccept(__ -> log.info().attr("segmentId", segmentId)
+                  .log("GC: segment pruned + backing topic deleted"));
+    }
+
+    private CompletableFuture<Void> deleteSegmentBackingTopic(SegmentInfo seg) 
{
+        String name = toSegmentUnderlyingPersistentName(seg);
+        try {
+            return brokerService.getPulsar().getAdminClient()
+                    .topics().deleteAsync(name, /* force */ true)
+                    .exceptionally(ex -> {
+                        Throwable cause =
+                                
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException
+                                .NotFoundException) {
+                            // Already gone — fine.
+                            return null;
+                        }
+                        log.warn().attr("segment", 
name).exceptionMessage(cause)
+                                .log("GC: failed to delete backing segment 
topic;"
+                                        + " will retry on next tick");
+                        return null;
+                    });
+        } catch (PulsarServerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to