merlimat commented on code in PR #25668:
URL: https://github.com/apache/pulsar/pull/25668#discussion_r3191833464
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
+ :
CompletableFuture.completedFuture(null))
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate /
prune segment;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return CompletableFuture.allOf(
+ perSegment.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
+ if (subs.isEmpty()) {
+ // No subscribers ever attached / all unsubscribed → nothing left
to drain.
+ return CompletableFuture.completedFuture(true);
+ }
+ CompletableFuture<Boolean>[] checks = subs.stream()
+ .map(sub -> isSegmentDrained(seg, sub))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(checks)
+ .thenApply(__ -> {
+ for (CompletableFuture<Boolean> c : checks) {
+ if (!c.join()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
+ /**
+ * Apply a single segment's prune: CAS the layout to drop it, then delete
the
+ * backing managed-ledger topic. Layout-prune is the point of no return —
once it
+ * lands, no consumer will ever subscribe to the segment again.
Backing-topic
+ * delete failures are logged and retried on subsequent ticks.
+ */
+ private CompletableFuture<Void> pruneSegmentAsync(SegmentInfo seg) {
+ long segmentId = seg.segmentId();
+ log.info().attr("segmentId", segmentId)
+ .attr("sealedAtMs", seg.sealedAtMs())
+ .log("GC: pruning sealed segment past retention");
+
+ return resources.updateScalableTopicAsync(topicName, md -> {
+ SegmentLayout latest = SegmentLayout.fromMetadata(md);
+ // Re-validate against the latest layout: another writer may have
already
+ // pruned this segment, or replaced it via a follow-up split /
merge. If
+ // it's gone, leave the metadata untouched (no-op CAS).
+ if (!latest.getAllSegments().containsKey(segmentId)) {
+ return md;
+ }
+ SegmentLayout updated = latest.pruneSegment(segmentId);
+ return updated.toMetadata(md.getProperties());
+ }).thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
+ .thenCompose(optMd -> {
+ currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+ return notifySubscriptions(currentLayout);
+ })
+ .thenCompose(__ -> deleteSegmentBackingTopic(seg))
+ .thenAccept(__ -> log.info().attr("segmentId", segmentId)
+ .log("GC: segment pruned + backing topic deleted"));
+ }
+
+ private CompletableFuture<Void> deleteSegmentBackingTopic(SegmentInfo seg)
{
+ String name = toSegmentUnderlyingPersistentName(seg);
+ try {
+ return brokerService.getPulsar().getAdminClient()
+ .topics().deleteAsync(name, /* force */ true)
+ .exceptionally(ex -> {
+ Throwable cause =
+
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException
+ .NotFoundException) {
+ // Already gone — fine.
+ return null;
+ }
+ log.warn().attr("segment",
name).exceptionMessage(cause)
+ .log("GC: failed to delete backing segment
topic;"
+ + " will retry on next tick");
+ return null;
+ });
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
Review Comment:
Done — using
`scalableTopics().deleteSegmentAsync(toSegmentPersistentName(seg), true)`, the
same primitive `ScalableTopicService.deleteUnderlyingSegmentTopic` uses. As a
side-benefit, this drops the hand-built `persistent://` URL (which was missing
the segment descriptor and would have been buggy at delete time).
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
+ :
CompletableFuture.completedFuture(null))
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate /
prune segment;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return CompletableFuture.allOf(
+ perSegment.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
+ if (subs.isEmpty()) {
+ // No subscribers ever attached / all unsubscribed → nothing left
to drain.
+ return CompletableFuture.completedFuture(true);
+ }
+ CompletableFuture<Boolean>[] checks = subs.stream()
+ .map(sub -> isSegmentDrained(seg, sub))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(checks)
+ .thenApply(__ -> {
+ for (CompletableFuture<Boolean> c : checks) {
+ if (!c.join()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
+ /**
+ * Apply a single segment's prune: CAS the layout to drop it, then delete
the
+ * backing managed-ledger topic. Layout-prune is the point of no return —
once it
+ * lands, no consumer will ever subscribe to the segment again.
Backing-topic
+ * delete failures are logged and retried on subsequent ticks.
+ */
+ private CompletableFuture<Void> pruneSegmentAsync(SegmentInfo seg) {
+ long segmentId = seg.segmentId();
+ log.info().attr("segmentId", segmentId)
+ .attr("sealedAtMs", seg.sealedAtMs())
+ .log("GC: pruning sealed segment past retention");
+
+ return resources.updateScalableTopicAsync(topicName, md -> {
+ SegmentLayout latest = SegmentLayout.fromMetadata(md);
+ // Re-validate against the latest layout: another writer may have
already
+ // pruned this segment, or replaced it via a follow-up split /
merge. If
+ // it's gone, leave the metadata untouched (no-op CAS).
+ if (!latest.getAllSegments().containsKey(segmentId)) {
+ return md;
+ }
+ SegmentLayout updated = latest.pruneSegment(segmentId);
+ return updated.toMetadata(md.getProperties());
Review Comment:
Good catch — fixed in 3b1dc1a. Drain checks still fan out concurrently, but
the layout mutation is now a single CAS that prunes every eligible segment in
one shot, followed by a single metadata reload + `notifySubscriptions(...)` and
parallel backing-topic deletes. No more siblings competing on the same znode.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
Review Comment:
Renamed: the inner lambda variable is now `ok` (and the surrounding code was
rewritten as part of the coalescing fix). The method name `prunable` stays.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
+ :
CompletableFuture.completedFuture(null))
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate /
prune segment;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return CompletableFuture.allOf(
+ perSegment.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
+ if (subs.isEmpty()) {
+ // No subscribers ever attached / all unsubscribed → nothing left
to drain.
+ return CompletableFuture.completedFuture(true);
+ }
+ CompletableFuture<Boolean>[] checks = subs.stream()
+ .map(sub -> isSegmentDrained(seg, sub))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(checks)
+ .thenApply(__ -> {
+ for (CompletableFuture<Boolean> c : checks) {
+ if (!c.join()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
+ /**
+ * Apply a single segment's prune: CAS the layout to drop it, then delete
the
+ * backing managed-ledger topic. Layout-prune is the point of no return —
once it
+ * lands, no consumer will ever subscribe to the segment again.
Backing-topic
+ * delete failures are logged and retried on subsequent ticks.
+ */
+ private CompletableFuture<Void> pruneSegmentAsync(SegmentInfo seg) {
+ long segmentId = seg.segmentId();
+ log.info().attr("segmentId", segmentId)
+ .attr("sealedAtMs", seg.sealedAtMs())
+ .log("GC: pruning sealed segment past retention");
+
+ return resources.updateScalableTopicAsync(topicName, md -> {
+ SegmentLayout latest = SegmentLayout.fromMetadata(md);
+ // Re-validate against the latest layout: another writer may have
already
+ // pruned this segment, or replaced it via a follow-up split /
merge. If
+ // it's gone, leave the metadata untouched (no-op CAS).
+ if (!latest.getAllSegments().containsKey(segmentId)) {
+ return md;
+ }
+ SegmentLayout updated = latest.pruneSegment(segmentId);
Review Comment:
Added: re-check `isLeader()` (and `closed`) at the top of `pruneAllAsync`
just before the CAS, since drain checks can take seconds and leadership may
flip meanwhile. The CAS itself would still protect the metadata, but bailing
early avoids a deposed leader writing layout updates and racing the new one on
backing-topic deletes.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java:
##########
@@ -619,10 +700,225 @@ public
CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. Failures on
individual
+ * segments are logged and skipped — the next tick retries.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ List<CompletableFuture<Void>> perSegment = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ perSegment.add(prunable(seg, subs)
+ .thenCompose(prunable -> prunable
+ ? pruneSegmentAsync(seg)
+ :
CompletableFuture.completedFuture(null))
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate /
prune segment;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return CompletableFuture.allOf(
+ perSegment.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
Review Comment:
Added a docstring on `pruneEligibleAsync` covering all three cases:
- **STREAM / QUEUE**: the drain check is the per-segment backlog admin
endpoint, which works the same way for Exclusive (STREAM) and Shared (QUEUE)
cursors — both report 0 backlog when the cursor is at the end of a sealed
segment.
- **CHECKPOINT**: no broker-side cursor exists, so the backlog endpoint
returns `NotFoundException`. `isSegmentDrained` (the existing helper) maps that
to `false`, so a segment with any registered CHECKPOINT subscription is treated
as "still in use" and never pruned. Conservative; safe default.
- **Parent-vs-child ordering**: a child can be pruned before its parent.
Sealed segments form a DAG, and `pruneSegment` rewrites parent/child edges; the
active leaves always cover the full hash range. Consumers using the post-prune
layout see the missing segment as "no longer present" — equivalent to "drained"
for STREAM parent-drain ordering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]