This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c1a73470d1e [feat] PIP-468: sealed-segment retention GC for scalable
topics (#25668)
c1a73470d1e is described below
commit c1a73470d1e4e30dfb30aebd785d6e57ce6482df
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 15:50:44 2026 -0700
[feat] PIP-468: sealed-segment retention GC for scalable topics (#25668)
---
.../broker/service/persistent/PersistentTopic.java | 7 +
.../service/scalable/ScalableTopicController.java | 349 ++++++++++++++++++++-
.../scalable/ScalableTopicControllerTest.java | 142 +++++++++
3 files changed, 496 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f98da6a5a89..bf795e85c3d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3550,6 +3550,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void checkGC() {
+ if (TopicName.get(topic).isSegment()) {
+ // Segment-backing topics are owned by ScalableTopicController;
its GC tick
+ // decides when a sealed segment is drained + retention-expired
and deletes
+ // the topic. Letting v4 inactive-topic-GC race against that would
risk the
+ // broker quietly tearing down a segment the controller still
considers live.
+ return;
+ }
if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 4a7b15a1e15..552ca321d84 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -19,20 +19,30 @@
package org.apache.pulsar.broker.service.scalable;
import io.github.merlimat.slog.Logger;
+import java.time.Clock;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentTopicName;
@@ -53,17 +63,27 @@ public class ScalableTopicController {
private static final Logger LOG =
Logger.get(ScalableTopicController.class);
private final Logger log;
+ /** Default cadence for the sealed-segment GC tick on the leader. */
+ static final Duration DEFAULT_GC_INTERVAL = Duration.ofSeconds(60);
+
@Getter
private final TopicName topicName;
private final ScalableTopicResources resources;
private final BrokerService brokerService;
private final LeaderElection<String> leaderElection;
+ /** Wall-clock source used for sealed-segment retention math. Tests
override. */
+ private final Clock clock;
+ /** Cadence of the GC tick. Tests override to a small value. */
+ private final Duration gcInterval;
private volatile SegmentLayout currentLayout;
/** Per-subscription consumer tracking. */
private final ConcurrentHashMap<String, SubscriptionCoordinator>
subscriptions = new ConcurrentHashMap<>();
+ /** Sealed-segment GC scheduled task. Non-null only while this broker is
leader. */
+ private volatile ScheduledFuture<?> gcTask;
+
@Getter
private volatile LeaderElectionState leaderState =
LeaderElectionState.NoLeader;
@@ -73,9 +93,24 @@ public class ScalableTopicController {
ScalableTopicResources resources,
BrokerService brokerService,
CoordinationService coordinationService) {
+ this(topicName, resources, brokerService, coordinationService,
+ Clock.systemUTC(), DEFAULT_GC_INTERVAL);
+ }
+
+ /**
+ * Test constructor: overrides the wall-clock source and the GC tick
cadence.
+ */
+ ScalableTopicController(TopicName topicName,
+ ScalableTopicResources resources,
+ BrokerService brokerService,
+ CoordinationService coordinationService,
+ Clock clock,
+ Duration gcInterval) {
this.topicName = topicName;
this.resources = resources;
this.brokerService = brokerService;
+ this.clock = clock;
+ this.gcInterval = gcInterval;
this.log = LOG.with().attr("topic", topicName).build();
this.leaderElection = coordinationService.getLeaderElection(
String.class,
@@ -90,6 +125,12 @@ public class ScalableTopicController {
*/
private void onLeaderStateChange(LeaderElectionState state) {
log.info().attr("state", state).log("Leader state change for scalable
topic");
+ if (state != LeaderElectionState.Leading) {
+ // Stepped down (or never was leader). Stop the GC tick so the
deposed leader
+ // doesn't race the new one on layout writes / backing-topic
deletes. The new
+ // leader's initialize() will reschedule.
+ cancelGcTask();
+ }
if (state == LeaderElectionState.NoLeader && !closed) {
initialize().exceptionally(ex -> {
log.warn().exceptionMessage(ex).log("Failed to re-elect after
NoLeader");
@@ -119,12 +160,52 @@ public class ScalableTopicController {
})
.thenCompose(__ -> {
if (isLeader()) {
+ scheduleGcTask();
return restoreSessionsFromStore();
}
return CompletableFuture.completedFuture(null);
});
}
+ /**
+ * Schedule the periodic sealed-segment GC tick. Only fires on the
controller leader;
+ * idempotent (re-entry just no-ops). Cancelled on close / leader-loss.
+ */
+ private synchronized void scheduleGcTask() {
+ if (closed || gcTask != null) {
+ return;
+ }
+ gcTask = scheduler().scheduleAtFixedRate(this::runGcTickSafely,
+ gcInterval.toMillis(), gcInterval.toMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void cancelGcTask() {
+ if (gcTask != null) {
+ gcTask.cancel(false);
+ gcTask = null;
+ }
+ }
+
+ private ScheduledExecutorService scheduler() {
+ return brokerService.getPulsar().getExecutor();
+ }
+
+ private void runGcTickSafely() {
+ if (!isLeader() || closed) {
+ return;
+ }
+ try {
+ runGcTickAsync().exceptionally(ex -> {
+ log.warn().exceptionMessage(ex).log("Scalable-topic GC tick
failed");
+ return null;
+ });
+ } catch (Throwable t) {
+ // Defensive: scheduleAtFixedRate would suppress the next firing
if a tick
+ // throws synchronously, so log and swallow here.
+ log.warn().exception(t).log("Scalable-topic GC tick threw");
+ }
+ }
+
/**
* Load persisted subscriptions and consumer registrations from the
metadata store and
* install them into per-subscription {@link SubscriptionCoordinator}
instances. Called
@@ -288,7 +369,7 @@ public class ScalableTopicController {
// Single timestamp shared by the local preview and the CAS-retried
metadata update,
// so the children's createdAtMs and the parent's sealedAtMs always
agree even if the
// CAS retries due to concurrent writers.
- final long nowMs = System.currentTimeMillis();
+ final long nowMs = clock.millis();
// Compute the new layout locally to derive child segment info
SegmentLayout newLayout = currentLayout.splitSegment(segmentId, nowMs);
@@ -337,7 +418,7 @@ public class ScalableTopicController {
// Single timestamp shared by the local preview and the CAS-retried
metadata
// update — see splitSegment for the rationale.
- final long nowMs = System.currentTimeMillis();
+ final long nowMs = clock.millis();
// Compute the new layout locally to derive merged segment info
SegmentLayout newLayout = currentLayout.mergeSegments(segmentId1,
segmentId2, nowMs);
@@ -619,10 +700,274 @@ public class ScalableTopicController {
});
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * One iteration of the sealed-segment GC. For every sealed segment in the
current
+ * layout whose retention window has expired, polls every known
subscription's
+ * backlog on that segment; if all subscriptions are drained, prunes the
segment
+ * from the DAG (CAS) and deletes its backing managed-ledger topic.
+ *
+ * <p>The retention window is resolved from topic-policies on the parent
+ * {@code topic://...} → namespace policy → broker default, the same
precedence
+ * Pulsar uses for regular topics.
+ *
+ * <p>Visible for tests; in production it's invoked by the scheduled task.
+ */
+ CompletableFuture<Void> runGcTickAsync() {
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ final SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // Candidates: sealed segments past their retention horizon. We resolve
+ // retention once per tick — cheap, and avoids per-segment policy
lookups.
+ return resolveRetentionMillisAsync()
+ .thenCompose(retentionMs -> {
+ if (retentionMs == null) {
+ // Negative / unset → retain forever. No GC this tick.
+ return CompletableFuture.completedFuture(null);
+ }
+ long now = clock.millis();
+ List<SegmentInfo> candidates = new ArrayList<>();
+ for (SegmentInfo seg : layout.getAllSegments().values()) {
+ if (seg.isSealed() && seg.sealedAtMs() > 0
+ && (now - seg.sealedAtMs()) >= retentionMs) {
+ candidates.add(seg);
+ }
+ }
+ if (candidates.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return pruneEligibleAsync(candidates);
+ });
+ }
+
+ /**
+ * For each candidate sealed segment, check that every existing
subscription has
+ * drained it (backlog == 0); prune the ones that pass. The drain checks
fan out
+ * concurrently, but the resulting layout mutation is coalesced into a
<em>single</em>
+ * CAS write so multiple eligible segments don't compete on the same
metadata znode.
+ *
+ * <p><b>Subscription-type behaviour.</b> The drain check is the
per-segment backlog
+ * admin endpoint — Pulsar's standard cursor-position view, which works
the same way
+ * for STREAM (Exclusive) and QUEUE (Shared) subscriptions: a sealed
segment with
+ * cursor at the end reports backlog 0. For CHECKPOINT subscriptions there
is no
+ * broker-side cursor, the endpoint returns {@code NotFoundException}, and
+ * {@code isSegmentDrained} reports {@code false} — the segment is treated
as
+ * "still in use" and never pruned while a CHECKPOINT subscription is
registered.
+ *
+ * <p><b>Parent-vs-child ordering.</b> Sealed segments form a DAG; pruning
is allowed
+ * in any order because the active leaves always cover the full hash
range, and the
+ * managed-ledger storage of each segment is independent. {@link
SegmentLayout#pruneSegment}
+ * rewrites the parent/child edges, so consumers using the post-prune
layout see the
+ * pruned segment as "no longer present" — equivalent to "drained" for
parent-drain
+ * ordering.
+ */
+ private CompletableFuture<Void> pruneEligibleAsync(List<SegmentInfo>
candidates) {
+ return resources.listSubscriptionsAsync(topicName)
+ .thenCompose(subs -> {
+ // Fan out drain checks; collect the survivors.
+ List<CompletableFuture<SegmentInfo>> filtered = new
ArrayList<>();
+ for (SegmentInfo seg : candidates) {
+ filtered.add(prunable(seg, subs)
+ .thenApply(ok -> ok ? seg : null)
+ .exceptionally(ex -> {
+ log.warn().attr("segmentId",
seg.segmentId())
+ .exceptionMessage(ex)
+ .log("GC: failed to evaluate
prunability;"
+ + " will retry on next
tick");
+ return null;
+ }));
+ }
+ return
CompletableFuture.allOf(filtered.toArray(CompletableFuture[]::new))
+ .thenApply(__ -> {
+ List<SegmentInfo> drained = new ArrayList<>();
+ for (var f : filtered) {
+ SegmentInfo s = f.join();
+ if (s != null) {
+ drained.add(s);
+ }
+ }
+ return drained;
+ });
+ })
+ .thenCompose(this::pruneAllAsync);
+ }
+
+ /**
+ * Coalesce all drained-and-eligible segments into a single
layout-mutation CAS,
+ * then fan out the per-segment backing-topic deletes. This is the path
that
+ * actually mutates state. Re-validates leadership before the CAS — drain
checks
+ * can take seconds, leadership may have flipped in the meantime, and we
don't
+ * want a deposed leader writing layout updates.
+ */
+ private CompletableFuture<Void> pruneAllAsync(List<SegmentInfo> drained) {
+ if (drained.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (!isLeader() || closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ for (SegmentInfo s : drained) {
+ log.info().attr("segmentId", s.segmentId())
+ .attr("sealedAtMs", s.sealedAtMs())
+ .log("GC: pruning sealed segment past retention");
+ }
+ return resources.updateScalableTopicAsync(topicName, md -> {
+ SegmentLayout latest = SegmentLayout.fromMetadata(md);
+ SegmentLayout updated = latest;
+ for (SegmentInfo s : drained) {
+ // Re-validate per segment: another writer (or a previous
failed
+ // tick of this same loop) may have already pruned it.
+ if (updated.getAllSegments().containsKey(s.segmentId())) {
+ updated = updated.pruneSegment(s.segmentId());
+ }
+ }
+ return updated == latest ? md :
updated.toMetadata(md.getProperties());
+ }).thenCompose(__ ->
resources.getScalableTopicMetadataAsync(topicName, true))
+ .thenCompose(optMd -> {
+ currentLayout = SegmentLayout.fromMetadata(optMd.orElseThrow());
+ return notifySubscriptions(currentLayout);
+ })
+ .thenCompose(__ -> {
+ CompletableFuture<?>[] deletes = drained.stream()
+ .map(this::deleteSegmentBackingTopic)
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(deletes);
+ })
+ .thenAccept(__ -> {
+ for (SegmentInfo s : drained) {
+ log.info().attr("segmentId", s.segmentId())
+ .log("GC: segment pruned + backing topic deleted");
+ }
+ });
+ }
+
+ private CompletableFuture<Boolean> prunable(SegmentInfo seg, List<String>
subs) {
+ if (subs.isEmpty()) {
+ // No subscribers ever attached / all unsubscribed → nothing left
to drain.
+ return CompletableFuture.completedFuture(true);
+ }
+ CompletableFuture<Boolean>[] checks = subs.stream()
+ .map(sub -> isSegmentDrained(seg, sub))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(checks)
+ .thenApply(__ -> {
+ for (CompletableFuture<Boolean> c : checks) {
+ if (!c.join()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
+ /**
+ * Delete the segment's backing storage via the {@code scalableTopics}
admin
+ * endpoint, which understands the {@code segment://} naming scheme and
routes
+ * to the segment's owning broker. Failures are best-effort: the controller
+ * has already pruned the segment from the layout (the point of no return),
+ * so a failed delete is just leaked storage that the next tick will retry.
+ */
+ private CompletableFuture<Void> deleteSegmentBackingTopic(SegmentInfo seg)
{
+ String name = toSegmentPersistentName(seg);
+ try {
+ return brokerService.getPulsar().getAdminClient()
+ .scalableTopics().deleteSegmentAsync(name, /* force */
true)
+ .exceptionally(ex -> {
+ Throwable cause =
+
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException
+ .NotFoundException) {
+ // Already gone — fine.
+ return null;
+ }
+ log.warn().attr("segment",
name).exceptionMessage(cause)
+ .log("GC: failed to delete backing segment
topic;"
+ + " will retry on next tick");
+ return null;
+ });
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ /**
+ * Resolve the effective retention-time-in-millis for this scalable topic
by
+ * layering: topic-policy on the parent {@code topic://...} → namespace
policy →
+ * broker config default. Returns {@code null} if retention is unset or
negative
+ * (= keep forever) — the GC tick treats that as "skip".
+ */
+ private CompletableFuture<Long> resolveRetentionMillisAsync() {
+ TopicPoliciesService topicPoliciesService =
+ brokerService.getPulsar().getTopicPoliciesService();
+ // Topic-level (override) layer.
+ return topicPoliciesService.getTopicPoliciesAsync(topicName,
+ TopicPoliciesService.GetType.LOCAL_ONLY)
+ .thenCompose(localOpt -> {
+ Optional<RetentionPolicies> rp = localOpt
+ .map(TopicPolicies::getRetentionPolicies)
+ .filter(java.util.Objects::nonNull);
+ if (rp.isPresent()) {
+ return
CompletableFuture.completedFuture(toRetentionMillis(rp.get()));
+ }
+ // Namespace layer.
+ NamespaceName ns = topicName.getNamespaceObject();
+ return brokerService.getPulsar().getPulsarResources()
+ .getNamespaceResources()
+ .getPoliciesAsync(ns)
+ .thenApply(nsOpt -> {
+ RetentionPolicies nsRp = nsOpt
+ .map(p -> p.retention_policies)
+ .orElse(null);
+ if (nsRp != null) {
+ return toRetentionMillis(nsRp);
+ }
+ return
defaultRetentionMillisFromBrokerConfig();
+ });
+ });
+ }
+
+ private static Long toRetentionMillis(RetentionPolicies rp) {
+ if (rp.getRetentionTimeInMinutes() < 0) {
+ return null; // keep forever
+ }
+ return TimeUnit.MINUTES.toMillis(rp.getRetentionTimeInMinutes());
+ }
+
+ private Long defaultRetentionMillisFromBrokerConfig() {
+ var conf = brokerService.getPulsar().getConfig();
+ if (conf == null) {
+ return null;
+ }
+ int min = conf.getDefaultRetentionTimeInMinutes();
+ return min < 0 ? null : TimeUnit.MINUTES.toMillis(min);
+ }
+
+ /** Test hook: count of sealed segments currently in the layout. */
+ int sealedSegmentCount() {
+ SegmentLayout layout = currentLayout;
+ if (layout == null) {
+ return 0;
+ }
+ int n = 0;
+ for (SegmentInfo s : layout.getAllSegments().values()) {
+ if (s.isSealed()) {
+ n++;
+ }
+ }
+ return n;
+ }
+
// --- Lifecycle ---
public CompletableFuture<Void> close() {
closed = true;
+ cancelGcTask();
// Stop each coordinator's drain poller before clearing — otherwise
the scheduler
// task keeps running after the controller goes away.
subscriptions.values().forEach(SubscriptionCoordinator::close);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 1ebd3f13ed0..c00b1923504 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -148,6 +148,13 @@ public class ScalableTopicControllerTest {
return new ScalableTopicController(tn, resources, brokerService,
coordinationService);
}
+ private ScalableTopicController newControllerWithClock(TopicName tn,
+ java.time.Clock
clock,
+ java.time.Duration
gcInterval) {
+ return new ScalableTopicController(tn, resources, brokerService,
coordinationService,
+ clock, gcInterval);
+ }
+
// --- initialize() ---
@Test
@@ -449,6 +456,141 @@ public class ScalableTopicControllerTest {
}
}
+ // --- Sealed-segment GC ---
+
+ /**
+ * After a split, the parent is sealed; with no live subscriptions and a
small
+ * configured retention the GC tick should prune the parent from the
layout and
+ * delete its backing topic.
+ */
+ @Test
+ public void testGcTickPrunesDrainedSealedSegmentPastRetention() throws
Exception {
+ // Inject GC-related mocks (topic-policies + namespace policies +
delete).
+ installGcMocks(/* nsRetentionMinutes */ 1);
+
+ // Use a controllable clock so we can fast-forward past the retention
window.
+ long startMs = 1_700_000_000_000L;
+ AdjustableClock clock = new AdjustableClock(startMs);
+ if (controller != null) {
+ controller.close().join();
+ }
+ controller = newControllerWithClock(topicName, clock,
+ java.time.Duration.ofHours(1)); // GC interval irrelevant — we
drive ticks manually
+ controller.initialize().get();
+
+ int sealedBefore = controller.sealedSegmentCount();
+ // Split segment 0 → seg 0 sealed at startMs, children created at
startMs.
+ controller.splitSegment(0).get();
+ assertEquals(controller.sealedSegmentCount(), sealedBefore + 1);
+
+ // Tick at the seal time — retention not yet elapsed; nothing pruned.
+ controller.runGcTickAsync().get();
+
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
+ "tick within retention window must not prune");
+
+ // Fast-forward past 1 minute; tick should now prune segment 0.
+ clock.set(startMs + java.util.concurrent.TimeUnit.MINUTES.toMillis(1)
+ 1_000L);
+ controller.runGcTickAsync().get();
+
assertFalse(controller.getLayout().get().getAllSegments().containsKey(0L),
+ "tick past retention must prune the sealed segment");
+ // Backing topic delete was issued via the segment-aware admin call.
+ verify(scalableTopics).deleteSegmentAsync(anyString(), anyBoolean());
+ }
+
+ /**
+ * If retention is set to "keep forever" (negative value), the GC tick is
a no-op
+ * even for sealed + drained segments.
+ */
+ @Test
+ public void testGcTickRespectsKeepForeverRetention() throws Exception {
+ installGcMocks(/* nsRetentionMinutes */ -1);
+
+ long now = 1_700_000_000_000L;
+ if (controller != null) {
+ controller.close().join();
+ }
+ java.time.Clock fixed = java.time.Clock.fixed(
+ java.time.Instant.ofEpochMilli(now + 365L * 86_400_000L),
+ java.time.ZoneOffset.UTC);
+ controller = newControllerWithClock(topicName, fixed,
java.time.Duration.ofHours(1));
+ controller.initialize().get();
+ controller.splitSegment(0).get();
+
+ controller.runGcTickAsync().get();
+
assertTrue(controller.getLayout().get().getAllSegments().containsKey(0L),
+ "negative retention must keep sealed segments forever");
+ }
+
+ /** Settable {@link java.time.Clock} for the GC tick tests. */
+ private static final class AdjustableClock extends java.time.Clock {
+ private volatile long nowMs;
+
+ AdjustableClock(long initialMs) {
+ this.nowMs = initialMs;
+ }
+
+ void set(long nowMs) {
+ this.nowMs = nowMs;
+ }
+
+ @Override
+ public java.time.ZoneId getZone() {
+ return java.time.ZoneOffset.UTC;
+ }
+
+ @Override
+ public java.time.Clock withZone(java.time.ZoneId zone) {
+ return this; // tests don't care about zone
+ }
+
+ @Override
+ public java.time.Instant instant() {
+ return java.time.Instant.ofEpochMilli(nowMs);
+ }
+
+ @Override
+ public long millis() {
+ return nowMs;
+ }
+ }
+
+ /**
+ * Wire up the mocks the GC tick needs: empty topic-policies (so retention
falls
+ * through to namespace), a namespace policy with the requested retention,
a
+ * "drained" segment-backlog response (cursor at end), and a successful
topic
+ * delete admin call.
+ */
+ @SuppressWarnings("unchecked")
+ private void installGcMocks(int nsRetentionMinutes) {
+ // Topic-policies service: no policies set on the topic.
+ var tps =
mock(org.apache.pulsar.broker.service.TopicPoliciesService.class);
+ when(pulsar.getTopicPoliciesService()).thenReturn(tps);
+ when(tps.getTopicPoliciesAsync(any(),
+
any(org.apache.pulsar.broker.service.TopicPoliciesService.GetType.class)))
+
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ // Namespace policy with the requested retention. Wired through
PulsarResources.
+ var pulsarResources =
mock(org.apache.pulsar.broker.resources.PulsarResources.class);
+ var namespaceResources =
mock(org.apache.pulsar.broker.resources.NamespaceResources.class);
+ when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
+
when(pulsarResources.getNamespaceResources()).thenReturn(namespaceResources);
+ org.apache.pulsar.common.policies.data.Policies nsPolicies =
+ new org.apache.pulsar.common.policies.data.Policies();
+ nsPolicies.retention_policies =
+ new org.apache.pulsar.common.policies.data.RetentionPolicies(
+ nsRetentionMinutes, /* sizeMB */ -1);
+ when(namespaceResources.getPoliciesAsync(any()))
+
.thenReturn(CompletableFuture.completedFuture(Optional.of(nsPolicies)));
+
+ // No active subscriptions on the parent → nothing to drain → segment
is
+ // immediately considered prunable on retention expiry.
+ // (resources.listSubscriptionsAsync returns [] from the in-memory
store by default.)
+
+ // Backing-topic delete succeeds (segment-aware admin call).
+ when(scalableTopics.deleteSegmentAsync(anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
// --- ConsumerRegistration record sanity ---
@Test