This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b3835c31dd [fix][broker] PIP-468: StreamConsumer / CheckpointConsumer
DAG-replay with parent-drain ordering (#25642)
7b3835c31dd is described below
commit 7b3835c31dd84e35b571fb730a81431c13e84297
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 08:57:44 2026 -0700
[fix][broker] PIP-468: StreamConsumer / CheckpointConsumer DAG-replay with
parent-drain ordering (#25642)
---
.../apache/pulsar/broker/admin/v2/Segments.java | 54 ++++
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../service/scalable/ScalableTopicController.java | 96 ++++++-
.../service/scalable/ScalableTopicService.java | 26 +-
.../service/scalable/SegmentDrainChecker.java | 50 ++++
.../service/scalable/SubscriptionCoordinator.java | 311 ++++++++++++++++++++-
.../scalable/SubscriptionCoordinatorTest.java | 184 +++++++++++-
.../api/v5/V5CheckpointConsumerDagReplayTest.java | 271 ++++++++++++++++++
.../api/v5/V5StreamConsumerDagReplayTest.java | 280 +++++++++++++++++++
.../apache/pulsar/client/admin/ScalableTopics.java | 18 ++
.../client/admin/internal/ScalableTopicsImpl.java | 11 +
11 files changed, 1290 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index e01961f9a30..0ae8b3264a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@@ -174,6 +175,59 @@ public class Segments extends AdminResource {
});
}
+ @GET
+
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog")
+ @ApiOperation(value = "Number of unconsumed entries in the segment topic
for the "
+ + "given subscription. Super-user only.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 404, message = "Segment topic or subscription
not found"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void getSubscriptionBacklog(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify the parent topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)",
required = true)
+ @PathParam("descriptor") String descriptor,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateNamespaceName(tenant, namespace);
+ TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
+
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
+ .thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+ .thenAccept(optTopic -> {
+ if (optTopic.isEmpty()) {
+ // No topic loaded → no subscription cursor → no
backlog. Returning
+ // 0 here would be wrong (caller might mark the
segment drained on
+ // a topic that simply hasn't loaded yet); a 404
forces the caller
+ // to retry, which matches our drain-poll contract.
+ throw new RestException(Response.Status.NOT_FOUND,
+ "Segment topic not loaded: " + segmentTopic);
+ }
+ var sub = optTopic.get().getSubscription(subscription);
+ if (sub == null) {
+ throw new RestException(Response.Status.NOT_FOUND,
+ "Subscription not found on segment: " +
subscription);
+ }
+
asyncResponse.resume(sub.getNumberOfEntriesInBacklog(false));
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .exception(ex).log("Failed to get segment
subscription backlog");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/{descriptor}")
@ApiOperation(value = "Delete a segment topic. Super-user only.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 871231f371a..2ed913561b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -879,6 +879,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final String subscription =
commandScalableTopicSubscribe.getSubscription();
final String consumerName =
commandScalableTopicSubscribe.getConsumerName();
final long consumerId = commandScalableTopicSubscribe.getConsumerId();
+ final org.apache.pulsar.common.api.proto.ScalableConsumerType
consumerType =
+ commandScalableTopicSubscribe.getConsumerType();
log.debug().attr("topic", topicStr).attr("subscription", subscription)
.attr("consumerName", consumerName).attr("requestId",
requestId)
@@ -920,7 +922,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
scalableTopicService.registerConsumer(topicName,
subscription, consumerName,
- consumerId, this)
+ consumerId, consumerType, this)
.whenCompleteAsync((assignment, ex) -> {
if (ex != null) {
Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 8b1805cd315..7aef354a0f8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -31,6 +31,7 @@ import
org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.scalable.SegmentInfo;
@@ -155,7 +156,32 @@ public class ScalableTopicController {
});
}
+ /**
+ * Restore-path entry: consumer type isn't persisted in metadata yet, so
we don't
+ * know whether the original subscription was STREAM (needs parent-drain
ordering)
+ * or CHECKPOINT / QUEUE (mustn't have it — CHECKPOINT never drains
parents because
+ * it doesn't create per-segment cursors). Default to <em>no
enforcement</em>; on the
+ * first register-after-restore the controller calls
+ * {@link SubscriptionCoordinator#installDrainChecker} if the type is
STREAM.
+ */
private SubscriptionCoordinator createCoordinator(String subscription) {
+ return createCoordinator(subscription, null);
+ }
+
+ private SubscriptionCoordinator createCoordinator(String subscription,
+ ScalableConsumerType consumerType) {
+ // Parent-drain ordering matters only for STREAM consumers (Exclusive
per-segment
+ // subscription with broker-tracked cursors → preserving per-key order
across a
+ // split requires waiting for the parent to drain before handing out
children).
+ // CHECKPOINT consumers track position client-side via Checkpoints and
don't even
+ // create per-segment cursors — their parent never reports as drained,
so the
+ // ordering machinery would block their children indefinitely. QUEUE
consumers
+ // are shared and accept out-of-order delivery by design. Null type
(restore
+ // path) starts without a checker; it's installed lazily on first
STREAM
+ // register.
+ SegmentDrainChecker checker =
+ consumerType == ScalableConsumerType.STREAM ?
this::isSegmentDrained : null;
+
// Defensive: PulsarService.getConfig() is null in some unit-test
mocks. Fall
// back to the SubscriptionCoordinator's default grace period in that
case.
var config = brokerService.getPulsar().getConfig();
@@ -175,7 +201,42 @@ public class ScalableTopicController {
currentLayout,
resources,
brokerService.getPulsar().getExecutor(),
- gracePeriod);
+ gracePeriod,
+ checker,
+ SubscriptionCoordinator.DEFAULT_DRAIN_INITIAL_DELAY,
+ SubscriptionCoordinator.DEFAULT_DRAIN_MAX_DELAY);
+ }
+
+ /**
+ * Drain check used by every {@link SubscriptionCoordinator} on this
topic. Asks the
+ * segment topic's owning broker for the per-subscription backlog via the
+ * {@code /segments/.../subscription/.../backlog} admin endpoint, which
redirects to
+ * the topic owner — works whether the controller and the segment colocate
or not.
+ *
+ * <p>Returns {@code false} if the segment topic or subscription is not
yet loaded
+ * (the admin endpoint replies 404). The next poll will succeed once the
consumer's
+ * subscribe lands the topic on its owning broker.
+ */
+ private CompletableFuture<Boolean> isSegmentDrained(SegmentInfo segment,
String subscription) {
+ String segmentTopicName = toSegmentPersistentName(segment);
+ try {
+ return brokerService.getPulsar().getAdminClient()
+ .scalableTopics()
+ .getSegmentSubscriptionBacklogAsync(segmentTopicName,
subscription)
+ .thenApply(backlog -> backlog != null && backlog <= 0)
+ .exceptionally(ex -> {
+ Throwable cause =
+
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) {
+ // Topic or subscription not loaded yet — try
again on the
+ // next poll. The consumer's subscribe will
materialize it.
+ return false;
+ }
+ throw
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+ });
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
}
private CompletableFuture<Void> electLeader() {
@@ -309,14 +370,42 @@ public class ScalableTopicController {
* <p>If a session with the same {@code consumerName} already exists (for
example
* because the consumer is reconnecting within the grace period), the
existing
* assignment is reused and no rebalance occurs.
+ *
+ * <p>The {@code consumerType} is used at coordinator creation time to
decide whether
+ * to enforce parent-drain ordering on assignments — see
+ * {@link SubscriptionCoordinator}. The coordinator's setting is fixed at
first
+ * registration (a subscription's type doesn't change in practice);
subsequent
+ * registers with a different type still work but won't change the
ordering policy.
+ */
+ /**
+ * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
+ * for backward compatibility. New callers should pass the explicit
type.
*/
+ @Deprecated
public CompletableFuture<ConsumerAssignment> registerConsumer(String
subscription,
String
consumerName,
long
consumerId,
TransportCnx cnx) {
+ return registerConsumer(subscription, consumerName, consumerId,
+ ScalableConsumerType.STREAM, cnx);
+ }
+
+ public CompletableFuture<ConsumerAssignment> registerConsumer(String
subscription,
+ String
consumerName,
+ long
consumerId,
+
ScalableConsumerType
+
consumerType,
+
TransportCnx cnx) {
checkLeader();
SubscriptionCoordinator coordinator = subscriptions.computeIfAbsent(
- subscription, this::createCoordinator);
+ subscription, sub -> createCoordinator(sub, consumerType));
+ // The coordinator may have been created on the failover-restore path
(consumer
+ // type unknown then; we defaulted to "no parent-drain enforcement").
Now that we
+ // know the type, upgrade if it's STREAM. installDrainChecker is a
no-op if the
+ // coordinator already has a checker, so safe to call unconditionally.
+ if (consumerType == ScalableConsumerType.STREAM) {
+ coordinator.installDrainChecker(this::isSegmentDrained);
+ }
return coordinator.registerConsumer(consumerName, consumerId, cnx)
.thenApply(assignments -> {
// Look up by name since the key may have been an existing
session
@@ -525,6 +614,9 @@ public class ScalableTopicController {
public CompletableFuture<Void> close() {
closed = true;
+ // Stop each coordinator's drain poller before clearing — otherwise
the scheduler
+ // task keeps running after the controller goes away.
+ subscriptions.values().forEach(SubscriptionCoordinator::close);
subscriptions.clear();
return leaderElection.asyncClose();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index e452d75af74..e43ca06ccbd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -230,12 +231,35 @@ public class ScalableTopicService {
/**
* Register a scalable consumer with the controller leader for {@code
topic}.
* Persists a durable session and returns the consumer's segment
assignment.
+ *
+ * <p>The {@code consumerType} drives broker-side semantics that depend on
the
+ * consumer mode — most importantly whether the {@link
SubscriptionCoordinator}
+ * enforces parent-drain ordering before handing out children of a split.
STREAM
+ * consumers want it (per-key ordering); CHECKPOINT and QUEUE consumers
don't
+ * (they either track position client-side or have shared,
already-out-of-order
+ * delivery semantics).
*/
public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName
topic, String subscription,
String
consumerName, long consumerId,
+
ScalableConsumerType
+
consumerType,
org.apache.pulsar.broker.service.TransportCnx cnx) {
return getOrCreateController(topic)
- .thenCompose(controller ->
controller.registerConsumer(subscription, consumerName, consumerId, cnx));
+ .thenCompose(controller ->
+ controller.registerConsumer(subscription,
consumerName, consumerId,
+ consumerType, cnx));
+ }
+
+ /**
+ * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
+ * for backward compatibility. New callers should pass the explicit
consumer type.
+ */
+ @Deprecated
+ public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName
topic, String subscription,
+ String
consumerName, long consumerId,
+
org.apache.pulsar.broker.service.TransportCnx cnx) {
+ return registerConsumer(topic, subscription, consumerName, consumerId,
+ ScalableConsumerType.STREAM, cnx);
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
new file mode 100644
index 00000000000..10210d6c67c
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Resolves whether a (sealed) segment has been fully drained by a particular
subscription.
+ *
+ * <p>Used by {@link SubscriptionCoordinator} to decide when newly-active
children of a
+ * split / merge can be assigned to consumers: an active child is only
assignable once
+ * <em>every</em> parent has been drained for the subscription, so message
order with respect
+ * to the split point is preserved.
+ *
+ * <p>Implementations typically read the segment topic's per-subscription
backlog (the
+ * cursor on a sealed topic with {@code msgBacklog == 0} is by definition at
the end). For
+ * subscriptions started with {@code Latest}, every sealed segment's cursor is
created at
+ * the topic's end, so the drain check completes immediately.
+ */
+@FunctionalInterface
+public interface SegmentDrainChecker {
+
+ /**
+ * Returns {@code true} if the segment's cursor for {@code subscription}
has reached the
+ * end of the segment's data, {@code false} otherwise. Errors complete the
future
+ * exceptionally; callers should treat them as "not drained yet" and retry.
+ *
+ * @param segment the segment to check (only meaningful for sealed
segments — active
+ * segments still receive new messages, so they're never
drained)
+ * @param subscription the subscription whose cursor we're asking about
+ */
+ CompletableFuture<Boolean> isDrained(SegmentInfo segment, String
subscription);
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
index ff55b7854ed..ade25fb1779 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -30,12 +31,15 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.pulsar.broker.resources.ScalableTopicResources;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.scalable.SegmentInfo;
import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.common.util.Backoff;
/**
* Manages segment-to-consumer assignments within a single subscription of a
scalable topic.
@@ -65,24 +69,68 @@ public class SubscriptionCoordinator {
// TODO: make configurable via broker config (e.g.
scalableTopicConsumerSessionTimeoutSeconds)
private static final Duration DEFAULT_GRACE_PERIOD =
Duration.ofSeconds(60);
+ /**
+ * First drain-status check fires this long after a sealed segment shows
up — fast
+ * enough that a freshly subscribed EARLIEST consumer doesn't wait for
backlog
+ * unblocking on a small topic.
+ */
+ static final Duration DEFAULT_DRAIN_INITIAL_DELAY = Duration.ofSeconds(2);
+
+ /**
+ * Upper bound on the drain-poll interval. Reached by exponential backoff
while a
+ * sealed segment remains undrained — long-tail backlog (consumers stalled
for hours)
+ * shouldn't have us hammering the segment owner every couple seconds.
Reset on every
+ * progress event (drain detected / layout changed / new consumer).
+ */
+ static final Duration DEFAULT_DRAIN_MAX_DELAY = Duration.ofMinutes(15);
+
@Getter
private final String subscriptionName;
private final TopicName topicName;
private final ScalableTopicResources resources;
private final ScheduledExecutorService scheduler;
private final Duration gracePeriod;
+ private final Duration drainInitialDelay;
+ private final Duration drainMaxDelay;
/** Keyed by consumerName — the stable session identity. */
private final Map<String, ConsumerSession> sessions = new
ConcurrentHashMap<>();
private Map<Long, ConsumerSession> segmentAssignments = new
LinkedHashMap<>();
private SegmentLayout currentLayout;
+ /**
+ * Sealed segments confirmed drained for this subscription (cursor at
end). In-memory
+ * only — on controller-leader failover the new leader rediscovers drain
status by
+ * polling. {@link #computeAssignment} consults this set to decide which
active
+ * children of a split / merge are eligible to be assigned (children stay
blocked
+ * until <em>every</em> sealed parent is drained, so message order isn't
broken).
+ */
+ private final Set<Long> drainedSegmentIds = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Backoff governing the drain-poll cadence. Starts at {@link
#drainInitialDelay},
+ * grows exponentially up to {@link #drainMaxDelay}, and is reset on every
progress
+ * event (a segment is observed drained, the layout changes, a new
consumer joins).
+ */
+ private final Backoff drainBackoff;
+ private ScheduledFuture<?> drainPollTask;
+ private boolean drainPollInProgress;
+ private boolean closed;
+ /**
+ * Drain checker installed on the coordinator. Mutable: starts null on the
+ * controller-leader-failover restore path (consumer type unknown until
reconnect),
+ * upgraded to a real checker on first STREAM register via {@link
#installDrainChecker}.
+ * Once non-null it stays non-null (we don't downgrade to no-ordering
mid-flight).
+ */
+ private SegmentDrainChecker drainChecker;
+
public SubscriptionCoordinator(String subscriptionName,
TopicName topicName,
SegmentLayout initialLayout,
ScalableTopicResources resources,
ScheduledExecutorService scheduler) {
- this(subscriptionName, topicName, initialLayout, resources, scheduler,
DEFAULT_GRACE_PERIOD);
+ this(subscriptionName, topicName, initialLayout, resources, scheduler,
+ DEFAULT_GRACE_PERIOD, null, DEFAULT_DRAIN_INITIAL_DELAY,
DEFAULT_DRAIN_MAX_DELAY);
}
public SubscriptionCoordinator(String subscriptionName,
@@ -91,15 +139,36 @@ public class SubscriptionCoordinator {
ScalableTopicResources resources,
ScheduledExecutorService scheduler,
Duration gracePeriod) {
+ this(subscriptionName, topicName, initialLayout, resources, scheduler,
gracePeriod,
+ null, DEFAULT_DRAIN_INITIAL_DELAY, DEFAULT_DRAIN_MAX_DELAY);
+ }
+
+ public SubscriptionCoordinator(String subscriptionName,
+ TopicName topicName,
+ SegmentLayout initialLayout,
+ ScalableTopicResources resources,
+ ScheduledExecutorService scheduler,
+ Duration gracePeriod,
+ SegmentDrainChecker drainChecker,
+ Duration drainInitialDelay,
+ Duration drainMaxDelay) {
this.subscriptionName = subscriptionName;
this.topicName = topicName;
this.currentLayout = initialLayout;
this.resources = resources;
this.scheduler = scheduler;
this.gracePeriod = gracePeriod;
+ this.drainChecker = drainChecker;
+ this.drainInitialDelay = drainInitialDelay;
+ this.drainMaxDelay = drainMaxDelay;
+ this.drainBackoff = Backoff.builder()
+ .initialDelay(drainInitialDelay)
+ .maxBackoff(drainMaxDelay)
+ .build();
this.log = LOG.with().attr("topic", topicName).attr("subscription",
subscriptionName).build();
}
+
// --- Register / unregister / reconnect ---
/**
@@ -132,7 +201,12 @@ public class SubscriptionCoordinator {
.thenApply(__ -> {
synchronized (this) {
sessions.put(consumerName, session);
- return rebalanceAndNotify();
+ Map<ConsumerSession, ConsumerAssignment> result =
rebalanceAndNotify();
+ // First consumer (or rejoining one) — kick off drain
checks at the
+ // shortest delay rather than whatever long backoff
we'd accumulated
+ // while idle.
+ resetAndRearmDrainPoll();
+ return result;
}
});
}
@@ -202,7 +276,8 @@ public class SubscriptionCoordinator {
/**
* Handle a layout change (split/merge). Recompute and push assignments to
connected
- * consumers.
+ * consumers, then make sure the drain poller is running so any new sealed
segments
+ * get noticed quickly (backoff reset → next check fires at the initial
delay).
*/
public synchronized CompletableFuture<Map<ConsumerSession,
ConsumerAssignment>> onLayoutChange(
SegmentLayout newLayout) {
@@ -211,7 +286,195 @@ public class SubscriptionCoordinator {
segmentAssignments.clear();
return CompletableFuture.completedFuture(Map.of());
}
- return CompletableFuture.completedFuture(rebalanceAndNotify());
+ Map<ConsumerSession, ConsumerAssignment> result = rebalanceAndNotify();
+ // New sealed segments may have appeared — restart drain checks from
the
+ // initial delay rather than continuing whatever long backoff we'd
settled into.
+ resetAndRearmDrainPoll();
+ return CompletableFuture.completedFuture(result);
+ }
+
+ /**
+ * Install a drain checker on a coordinator that doesn't have one yet.
Used on the
+ * first STREAM register against a coordinator that was created on the
+ * controller-failover restore path (where consumer type wasn't known and
we
+ * defaulted to "no enforcement"). No-op if a checker is already installed.
+ */
+ synchronized void installDrainChecker(SegmentDrainChecker checker) {
+ if (this.drainChecker != null || checker == null) {
+ return;
+ }
+ this.drainChecker = checker;
+ log.info().log("Drain checker installed on existing coordinator;
rebalancing");
+ if (!sessions.isEmpty()) {
+ rebalanceAndNotify();
+ }
+ resetAndRearmDrainPoll();
+ }
+
+ /**
+ * Stop the periodic drain-status poller. Called by the controller on
close. Idempotent.
+ * Also flips a {@code closed} flag so any {@link #pollDrainStatus()}
iteration that's
+ * mid-flight aborts its rearm step instead of leaking a task into the
executor.
+ */
+ public synchronized void close() {
+ closed = true;
+ if (drainPollTask != null) {
+ drainPollTask.cancel(false);
+ drainPollTask = null;
+ }
+ }
+
+ // --- Drain tracking ---
+
+ /**
+ * A segment is assignable to consumers when:
+ * <ul>
+ * <li>it's sealed — there's no harm in always handing it out (the v4
layer drains or
+ * sees {@code TopicTerminated} immediately if already drained);
or</li>
+ * <li>it's active <b>and</b> every parent in the current layout has
been drained for
+ * this subscription. Without the parent-drain check we'd hand a
consumer the
+ * child of a just-split segment immediately, breaking per-key
ordering against
+ * any unread messages still sitting in the parent.</li>
+ * </ul>
+ *
+ * <p>If no {@link SegmentDrainChecker} was configured (e.g., the simple
test
+ * constructor), the parent-drain ordering is disabled and every segment
is treated
+ * as assignable.
+ */
+ private boolean isAssignable(SegmentInfo segment, SegmentLayout layout) {
+ if (drainChecker == null || !segment.isActive()) {
+ return true;
+ }
+ for (long parentId : segment.parentIds()) {
+ // A parent that's no longer in the DAG has been pruned (its data
is gone), so
+ // treat it as drained — there's nothing to wait on.
+ if (layout.getAllSegments().containsKey(parentId)
+ && !drainedSegmentIds.contains(parentId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Mark the given sealed segments as drained. Visible for testing —
production code
+ * goes through {@link #pollDrainStatus()}, which queries the broker.
+ */
+ synchronized void markSegmentsDrained(Set<Long> segmentIds) {
+ boolean changed = false;
+ for (long id : segmentIds) {
+ if (drainedSegmentIds.add(id)) {
+ changed = true;
+ }
+ }
+ if (changed) {
+ // Progress: a sealed segment is now drained. Reset the backoff
and re-arm so
+ // the next poll lands at the initial delay — once one parent
drains, others
+ // on the same DAG branch often follow shortly.
+ if (!sessions.isEmpty()) {
+ log.info().attr("drained", segmentIds).log("Sealed segments
drained, rebalancing");
+ rebalanceAndNotify();
+ }
+ resetAndRearmDrainPoll();
+ }
+ }
+
+ /**
+ * Cancel any pending drain-poll task, reset the backoff, and re-arm the
poller. Used
+ * on every progress event (drain detected, layout change, fresh consumer)
so the
+ * next check lands at {@link #drainInitialDelay} rather than at whatever
long backoff
+ * we'd settled into during a quiet period.
+ */
+ private synchronized void resetAndRearmDrainPoll() {
+ drainBackoff.reset();
+ if (drainPollTask != null) {
+ drainPollTask.cancel(false);
+ drainPollTask = null;
+ }
+ ensureDrainPollerRunning();
+ }
+
+ /**
+ * (Re-)arm the drain poller. No-op when no {@link SegmentDrainChecker}
was configured
+ * or when the coordinator has been {@link #close() closed}. Each iteration
+ * self-schedules the next via the {@link Backoff} so the cadence grows
exponentially
+ * while sealed segments stay undrained — short delays at first (a fresh
EARLIEST
+ * consumer typically drains the parent within seconds), capped at
+ * {@link #drainMaxDelay} for long-tail backlogs.
+ */
+ private void ensureDrainPollerRunning() {
+ if (closed || drainChecker == null) {
+ return;
+ }
+ if (drainPollTask != null && !drainPollTask.isCancelled() &&
!drainPollTask.isDone()) {
+ return;
+ }
+ long delayMs = drainBackoff.next().toMillis();
+ drainPollTask = scheduler.schedule(this::pollDrainStatus, delayMs,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Single poll iteration: ask the drain checker about every sealed segment
that isn't
+ * already known to be drained, collect the newly-drained ones, and
rebalance. Schedule
+ * the next iteration via the backoff (or stop, if there's nothing left to
check).
+ */
+ private void pollDrainStatus() {
+ SegmentLayout layout;
+ List<SegmentInfo> toCheck;
+ synchronized (this) {
+ drainPollTask = null;
+ if (closed || drainPollInProgress || sessions.isEmpty()) {
+ return;
+ }
+ layout = currentLayout;
+ toCheck = layout.getAllSegments().values().stream()
+ .filter(seg -> !seg.isActive() &&
!drainedSegmentIds.contains(seg.segmentId()))
+ .toList();
+ if (toCheck.isEmpty()) {
+ // Every sealed segment is already drained — no need to keep
polling. The
+ // poller re-arms when onLayoutChange / registerConsumer adds
new work.
+ drainBackoff.reset();
+ return;
+ }
+ drainPollInProgress = true;
+ }
+
+ Set<Long> newlyDrained = ConcurrentHashMap.newKeySet();
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] futures = new CompletableFuture[toCheck.size()];
+ for (int i = 0; i < toCheck.size(); i++) {
+ SegmentInfo seg = toCheck.get(i);
+ futures[i] = drainChecker.isDrained(seg, subscriptionName)
+ .handle((drained, ex) -> {
+ if (ex != null) {
+ // Log at debug — drain checks happen often and
transient errors
+ // (e.g., topic still being looked up) shouldn't
spam.
+ log.debug().attr("segmentId", seg.segmentId())
+ .exceptionMessage(ex)
+ .log("Drain check failed; will retry next
poll");
+ return null;
+ }
+ if (Boolean.TRUE.equals(drained)) {
+ newlyDrained.add(seg.segmentId());
+ }
+ return null;
+ });
+ }
+ CompletableFuture.allOf(futures).whenComplete((__, ex) -> {
+ try {
+ if (!newlyDrained.isEmpty()) {
+ // markSegmentsDrained resets the backoff for us.
+ markSegmentsDrained(new HashSet<>(newlyDrained));
+ }
+ } finally {
+ synchronized (SubscriptionCoordinator.this) {
+ drainPollInProgress = false;
+ // Re-arm: undrained sealed segments still pending → keep
polling, but
+ // with the next (longer) backoff if no progress was made
this round.
+ ensureDrainPollerRunning();
+ }
+ }
+ });
}
// --- Accessors ---
@@ -267,11 +530,29 @@ public class SubscriptionCoordinator {
}
/**
- * Compute a balanced assignment of active segments to consumers.
+ * Compute a balanced assignment of segments to consumers.
+ *
+ * <p>Strategy: sort segments by hash range, then segment id (tiebreak),
sort consumers by
+ * name, then round-robin. Deterministic: the same inputs always produce
the same output,
+ * so a new leader recomputing assignments after failover gets the same
result as the old
+ * leader.
+ *
+ * <p><b>DAG replay.</b> The assignment includes every <em>sealed</em>
segment in the
+ * DAG. A fresh EARLIEST subscription needs to read messages produced
before it joined,
+ * and those may live on segments that have since been sealed by a split /
merge.
+ *
+ * <p><b>Parent-drain ordering.</b> An <em>active</em> child segment is
only assigned
+ * once <em>every</em> parent in the DAG has been drained for this
subscription
+ * (tracked in {@link #drainedSegmentIds}). Without this guard a consumer
would be
+ * handed an active child immediately after a split and start receiving
new messages
+ * for some key while the same key's pre-split messages still sit unread
on the sealed
+ * parent — breaking per-key ordering. Initial active segments (those with
no parents
+ * in the layout) are unaffected and assigned right away.
*
- * <p>Strategy: sort segments by hash range start, sort consumers by name,
then
- * round-robin. Deterministic: the same inputs always produce the same
output, so a new
- * leader recomputing assignments after failover gets the same result as
the old leader.
+ * <p>The client side (per-segment v4 consumer) drains a
sealed-but-still-present
+ * segment naturally and closes it on {@code TopicTerminated}; a
sealed-and-already-
+ * drained segment yields {@code TopicTerminated} immediately, so the cost
of
+ * including it is one short-lived v4 subscribe.
*/
Map<ConsumerSession, ConsumerAssignment> computeAssignment(
SegmentLayout layout, Collection<ConsumerSession> consumers) {
@@ -280,8 +561,10 @@ public class SubscriptionCoordinator {
return Map.of();
}
- List<SegmentInfo> sortedSegments =
layout.getActiveSegments().values().stream()
- .sorted(Comparator.comparing(SegmentInfo::hashRange))
+ List<SegmentInfo> sortedSegments =
layout.getAllSegments().values().stream()
+ .filter(seg -> isAssignable(seg, layout))
+ .sorted(Comparator.comparing(SegmentInfo::hashRange)
+ .thenComparingLong(SegmentInfo::segmentId))
.toList();
List<ConsumerSession> sortedConsumers = consumers.stream()
@@ -339,4 +622,12 @@ public class SubscriptionCoordinator {
private Map<ConsumerSession, ConsumerAssignment> snapshotAssignments() {
return computeAssignment(currentLayout, sessions.values());
}
+
+ /**
+ * Test hook: return the assignment that would be sent right now, computed
against the
+ * current layout and connected consumers. Visible for unit tests.
+ */
+ synchronized Map<ConsumerSession, ConsumerAssignment> currentAssignment() {
+ return computeAssignment(currentLayout, sessions.values());
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
index 4de8ddb0a43..d3f6e1e0bc0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -141,8 +142,189 @@ public class SubscriptionCoordinatorTest {
Map<ConsumerSession, ConsumerAssignment> result =
coordinator.onLayoutChange(newLayout).get();
+ // After the split: segment 0 is sealed, two new active children take
its place,
+ // and segments 1..3 stay active. The default test coordinator runs
without a
+ // SegmentDrainChecker, so parent-drain ordering is disabled and every
segment in
+ // the DAG (active + sealed) is assigned — 5 active + 1 sealed = 6.
assertEquals(result.size(), 1);
- assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 5);
+ assertEquals(findByName(result,
"consumer-1").assignedSegments().size(), 6);
+ }
+
+ /**
+ * Coordinator running with a {@link SegmentDrainChecker} must hold back
active
+ * children until <em>every</em> sealed parent is reported as drained.
Until then
+ * only the sealed parent and the unrelated initial active segments make
it into
+ * the assignment.
+ */
+ @Test
+ public void testActiveChildrenBlockedUntilParentDrained() throws Exception
{
+ // Re-create the coordinator with a controllable drain checker. We
start with no
+ // sealed segments reported as drained.
+ Set<Long> drained = ConcurrentHashMap.newKeySet();
+ SegmentDrainChecker checker = (segment, sub) ->
+
CompletableFuture.completedFuture(drained.contains(segment.segmentId()));
+ SubscriptionCoordinator orderedCoordinator = new
SubscriptionCoordinator("test-sub",
+ topicName, initialLayout, resources, scheduler,
Duration.ofMillis(200),
+ checker, Duration.ofMillis(50), Duration.ofSeconds(5));
+ try {
+ orderedCoordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+
+ SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ Map<ConsumerSession, ConsumerAssignment> result =
+ orderedCoordinator.onLayoutChange(afterSplit).get();
+
+ // Layout: segment 0 sealed (parent=∅), segments 1..3 active
(parent=∅),
+ // segments 4 + 5 active (parent=[0]). Children of 0 must be
excluded until
+ // 0 is drained.
+ ConsumerAssignment a = findByName(result, "consumer-1");
+ assertNotNull(a);
+ Set<Long> assigned = new HashSet<>(segmentIds(a));
+ assertTrue(assigned.containsAll(Set.of(0L, 1L, 2L, 3L)),
+ "sealed parent + initial active children must be assigned,
got " + assigned);
+ assertFalse(assigned.contains(4L), "child of un-drained parent
must be blocked");
+ assertFalse(assigned.contains(5L), "child of un-drained parent
must be blocked");
+
+ // Mark the parent drained — the next poll should pick it up and
the children
+ // must end up assigned.
+ drained.add(0L);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
{
+ Set<Long> nowAssigned = new HashSet<>(segmentIds(
+ findByName(orderedCoordinator.currentAssignment(),
"consumer-1")));
+ assertTrue(nowAssigned.containsAll(Set.of(0L, 1L, 2L, 3L, 4L,
5L)),
+ "after parent drain, all 6 segments must be assigned,
got " + nowAssigned);
+ });
+ } finally {
+ orderedCoordinator.close();
+ }
+ }
+
+ /**
+ * After the drain poller has backed off, a fresh consumer registration
must cancel
+ * the long-delay scheduled task and re-arm at the initial delay —
otherwise the new
+ * consumer would wait the full backed-off delay before its first drain
check.
+ * Regression for the review note: {@code ensureDrainPollerRunning} alone
is a no-op
+ * when a task is already scheduled, so progress events have to
cancel-and-rearm.
+ */
+ @Test
+ public void testFreshRegisterCancelsBackedOffPollAndRearmsImmediately()
throws Exception {
+ java.util.concurrent.atomic.AtomicInteger checks = new
java.util.concurrent.atomic.AtomicInteger();
+ SegmentDrainChecker checker = (segment, sub) -> {
+ checks.incrementAndGet();
+ return CompletableFuture.completedFuture(false);
+ };
+ // Initial 50ms, max 5s — exponential, several polls happen quickly.
+ SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
+ topicName, initialLayout.splitSegment(0), resources, scheduler,
+ Duration.ofMillis(200), checker, Duration.ofMillis(50),
Duration.ofSeconds(5));
+ try {
+ c.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ // Let a few polls happen so the backoff grows past the initial
delay.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> checks.get() >= 3);
+
+ int countBefore = checks.get();
+ // Pretend the long-delay task is in flight by registering another
consumer —
+ // the reset+rearm helper must fire a fresh check at ~initial
delay (50ms).
+ c.registerConsumer("consumer-2", 2L,
mock(TransportCnx.class)).get();
+ // 250ms is well under any plausible backed-off delay; if the task
wasn't
+ // cancelled and re-armed, no new check would land in this window.
+ Thread.sleep(250);
+ assertTrue(checks.get() > countBefore,
+ "register-on-progress must cancel pending task and re-arm
at initial delay");
+ } finally {
+ c.close();
+ }
+ }
+
+ /**
+ * close() during an in-flight poll must prevent the {@code whenComplete}
rearm path
+ * from scheduling a new task — otherwise the controller goes away while a
stale
+ * drain check is queued for execution.
+ */
+ @Test
+ public void testCloseRaceWithInFlightPollDoesNotLeakRearm() throws
Exception {
+ // Drain checker that blocks until released — gives us a deterministic
in-flight
+ // window that overlaps with close().
+ CompletableFuture<Boolean> blocking = new CompletableFuture<>();
+ java.util.concurrent.atomic.AtomicInteger checks = new
java.util.concurrent.atomic.AtomicInteger();
+ SegmentDrainChecker checker = (segment, sub) -> {
+ checks.incrementAndGet();
+ return blocking;
+ };
+ SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
+ topicName, initialLayout.splitSegment(0), resources, scheduler,
+ Duration.ofMillis(200), checker, Duration.ofMillis(20),
Duration.ofSeconds(1));
+ c.registerConsumer("consumer-1", 1L, mock(TransportCnx.class)).get();
+ // Wait until at least one poll has started.
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() ->
checks.get() >= 1);
+
+ c.close();
+ // Release the in-flight check after close — its whenComplete will run
with
+ // closed=true and must NOT schedule another task.
+ blocking.complete(false);
+
+ int countAfterClose = checks.get();
+ Thread.sleep(200);
+ assertEquals(checks.get(), countAfterClose,
+ "no further drain checks should fire after close()");
+ }
+
+ /**
+ * On controller-leader failover the restore path creates a coordinator
without
+ * knowing the consumer type (not yet persisted in metadata). It must
default to
+ * <em>no parent-drain enforcement</em> — picking STREAM as a
"conservative default"
+ * would deadlock CHECKPOINT subscriptions, whose parents never report
drained.
+ */
+ @Test
+ public void testRestoredCoordinatorStartsWithoutParentDrainOrdering()
throws Exception {
+ // restoreConsumers installs a session and computes assignment
immediately. With
+ // no drain checker the active children of the (sealed) split parent
are eligible
+ // right away.
+ coordinator.restoreConsumers(java.util.List.of("consumer-1"));
+ SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ Map<ConsumerSession, ConsumerAssignment> result =
+ coordinator.onLayoutChange(afterSplit).get();
+ ConsumerAssignment a = findByName(result, "consumer-1");
+ assertNotNull(a);
+ Set<Long> assigned = new HashSet<>(segmentIds(a));
+ // 4 active + 1 sealed + 2 split children — restore-path coordinator
has no
+ // checker so children are NOT held back.
+ assertEquals(assigned.size(), 6,
+ "restored coordinator (no drain checker) must hand out every
DAG segment "
+ + "right away to avoid deadlocking CHECKPOINT/QUEUE
subs, got "
+ + assigned);
+ }
+
+ /**
+ * After a STREAM consumer reconnects to a restored coordinator, the
controller is
+ * expected to install a real drain checker via
+ * {@link
SubscriptionCoordinator#installDrainChecker(SegmentDrainChecker)}. Once
+ * installed, parent-drain ordering kicks in: children of an un-drained
parent get
+ * filtered out of the next assignment.
+ */
+ @Test
+ public void testInstallDrainCheckerAfterRestoreEnablesOrdering() throws
Exception {
+ coordinator.registerConsumer("consumer-1", 1L,
mock(TransportCnx.class)).get();
+ SegmentLayout afterSplit = initialLayout.splitSegment(0);
+ coordinator.onLayoutChange(afterSplit).get();
+ // No checker yet → all 6 segments assigned.
+ assertEquals(segmentIds(findByName(coordinator.currentAssignment(),
"consumer-1")).size(),
+ 6);
+
+ // Install a checker that says "nothing is drained" → children of
segment 0 must
+ // disappear from the next assignment.
+ Set<Long> drained = ConcurrentHashMap.newKeySet();
+ coordinator.installDrainChecker((segment, sub) ->
+
CompletableFuture.completedFuture(drained.contains(segment.segmentId())));
+ Set<Long> assigned = new HashSet<>(segmentIds(
+ findByName(coordinator.currentAssignment(), "consumer-1")));
+ // Parent stays (sealed always assignable); children of 0 are blocked.
+ assertTrue(assigned.contains(0L) && assigned.contains(1L) &&
assigned.contains(2L)
+ && assigned.contains(3L),
+ "sealed parent + initial active segments must remain, got " +
assigned);
+ assertEquals(assigned.size(), 4,
+ "active children of un-drained parent must be filtered after
install, got "
+ + assigned);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
new file mode 100644
index 00000000000..fdebd133f9c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * DAG-replay coverage for V5 {@link CheckpointConsumer} — the checkpoint
analogue of
+ * {@code V5StreamConsumerDagReplayTest}. A fresh {@link
Checkpoint#earliest()} consumer
+ * subscribed after a split must still read the messages that ended up on the
now-sealed
+ * parent segment.
+ *
+ * <p>Unmanaged {@link CheckpointConsumer} (no {@code consumerGroup(...)})
walks the DAG
+ * via {@code DagWatchClient} and subscribes to active + sealed segments
directly — it
+ * doesn't go through the broker's {@link
+ * org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}. The
managed flow
+ * (with {@code consumerGroup(...)}) does, so the parent-drain ordering
applies there;
+ * unlike a {@link StreamConsumer}, a managed {@link CheckpointConsumer}
doesn't create
+ * subscription cursors on the segment topics (it reads via Readers and tracks
position
+ * client-side as a {@link Checkpoint}), so the broker treats every sealed
segment as
+ * effectively drained and unblocks the active children right away.
+ */
+public class V5CheckpointConsumerDagReplayTest extends V5ClientBaseTest {
+
+ @Test
+ public void testEarliestUnmanagedCheckpointPostSplitReadsSealedBacklog()
throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Phase 1: messages land on segment 0.
+ int preSplit = 30;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < preSplit; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Split: segment 0 sealed, two new active children.
+ long parentSegment = singleActiveSegmentId(topic);
+ admin.scalableTopics().splitSegment(topic, parentSegment);
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ // Phase 2: messages land on the children.
+ int postSplit = 30;
+ for (int i = 0; i < postSplit; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("kk-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Fresh consumer subscribes AFTER split — must see the pre-split
sealed backlog
+ // alongside the post-split active data. Unmanaged path uses
DagWatchClient and
+ // covers active + sealed segments, so this works without going through
+ // SubscriptionCoordinator at all.
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Set<String> received = drainExactly(consumer, sent.size());
+ assertEquals(received, sent,
+ "unmanaged EARLIEST checkpoint consumer must receive both
pre-split "
+ + "(sealed) and post-split (active) messages");
+ }
+
+ @Test
+ public void testEarliestUnmanagedCheckpointAfterTwoSplitsReadsAllBacklog()
throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ String v = "p1-" + i;
+ producer.newMessage().key("a-" + i).value(v).send();
+ sent.add(v);
+ }
+ admin.scalableTopics().splitSegment(topic,
singleActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ for (int i = 0; i < 20; i++) {
+ String v = "p2-" + i;
+ producer.newMessage().key("b-" + i).value(v).send();
+ sent.add(v);
+ }
+ admin.scalableTopics().splitSegment(topic, anyActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 3));
+
+ for (int i = 0; i < 20; i++) {
+ String v = "p3-" + i;
+ producer.newMessage().key("c-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Set<String> received = drainExactly(consumer, sent.size());
+ assertEquals(received, sent,
+ "unmanaged EARLIEST checkpoint consumer must receive every
message produced "
+ + "across the DAG, including both sealed generations");
+ }
+
+ /**
+ * Same scenario as the unmanaged version, but with {@code
consumerGroup(...)}: the
+ * broker drives the assignment via {@link
+ * org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}.
Checkpoint
+ * consumers don't create per-segment subscription cursors, so the
broker's drain
+ * check sees no subscription on the sealed parent → no backlog to worry
about → the
+ * active children are unblocked right away.
+ */
+ @Test
+ public void testEarliestManagedCheckpointPostSplitReadsSealedBacklog()
throws Exception {
+ String topic = newScalableTopic(1);
+ String group = "managed-dag-replay";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ int preSplit = 30;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < preSplit; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ admin.scalableTopics().splitSegment(topic,
singleActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ int postSplit = 30;
+ for (int i = 0; i < postSplit; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("kk-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Set<String> received = drainExactly(consumer, sent.size());
+ assertEquals(received, sent,
+ "managed EARLIEST checkpoint consumer must receive both
pre-split "
+ + "(sealed) and post-split (active) messages");
+ }
+
+ @Test
+ public void testEarliestManagedCheckpointAfterTwoSplitsReadsAllBacklog()
throws Exception {
+ String topic = newScalableTopic(1);
+ String group = "managed-dag-replay-deep";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ String v = "p1-" + i;
+ producer.newMessage().key("a-" + i).value(v).send();
+ sent.add(v);
+ }
+ admin.scalableTopics().splitSegment(topic,
singleActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ for (int i = 0; i < 20; i++) {
+ String v = "p2-" + i;
+ producer.newMessage().key("b-" + i).value(v).send();
+ sent.add(v);
+ }
+ admin.scalableTopics().splitSegment(topic, anyActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 3));
+
+ for (int i = 0; i < 20; i++) {
+ String v = "p3-" + i;
+ producer.newMessage().key("c-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Set<String> received = drainExactly(consumer, sent.size());
+ assertEquals(received, sent,
+ "managed EARLIEST checkpoint consumer must receive every
message produced "
+ + "across the DAG, including both sealed generations");
+ }
+
+ // --- Helpers ---
+
+ private Set<String> drainExactly(CheckpointConsumer<String> consumer, int
expected) throws Exception {
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ }
+ }
+ return received;
+ }
+
+ private long singleActiveSegmentId(String topic) throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ return seg.getSegmentId();
+ }
+ }
+ throw new AssertionError("no active segment for " + topic);
+ }
+
+ private long anyActiveSegmentId(String topic) throws Exception {
+ return singleActiveSegmentId(topic);
+ }
+
+ private int activeSegmentCount(String topic) throws Exception {
+ int active = 0;
+ for (var seg :
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ return active;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
new file mode 100644
index 00000000000..d5204864a8c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * DAG-replay coverage for V5 {@link StreamConsumer}.
+ *
+ * <p>Companion to the QueueConsumer fix in #25611 ({@code subscribes to
active + sealed}).
+ * StreamConsumer assignment is controller-driven: the broker's
+ * {@link
org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}{@code
+ * #computeAssignment} now hands consumers every segment in the DAG, not just
the
+ * currently-active ones, so a fresh EARLIEST consumer can pick up backlog
that lives on
+ * already-sealed segments (i.e., messages produced before a split / merge
happened).
+ *
+ * <p>Without this, {@link
#testEarliestSubscribePostSplitReadsSealedBacklog()} would only
+ * see messages produced after the consumer joined; sealed-segment data would
be silently
+ * orphaned.
+ */
+public class V5StreamConsumerDagReplayTest extends V5ClientBaseTest {
+
+ /**
+ * Produce N messages into a single-segment topic (everything lands on
segment 0),
+ * split, then attach a brand-new EARLIEST StreamConsumer. Without the
controller
+ * including segment 0 in the assignment the consumer would only see
post-split sends
+ * and the original N messages would be unreachable for this subscription.
+ */
+ @Test
+ public void testEarliestSubscribePostSplitReadsSealedBacklog() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Phase 1: fill segment 0.
+ int preSplit = 50;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < preSplit; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Split: segment 0 is sealed, two new active children take over.
+ long parentSegment = singleActiveSegmentId(topic);
+ admin.scalableTopics().splitSegment(topic, parentSegment);
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2,
+ "split must produce 2 active children"));
+
+ // Phase 2: more messages, this time landing on the children.
+ int postSplit = 50;
+ for (int i = 0; i < postSplit; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("kk-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // The consumer subscribes for the first time — AFTER both batches and
the split.
+ // The controller has to include segment 0 (sealed, with backlog) in
the initial
+ // assignment for this consumer to see the pre-split messages at all.
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-replay-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Ack cumulatively as we go — the broker uses subscription backlog ==
0 to decide
+ // a sealed segment is drained, so deferring acks until the end would
keep the
+ // pre-split parent "in backlog" forever and the active children would
never get
+ // unblocked.
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < sent.size() && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledgeCumulative(msg.id());
+ }
+ }
+ assertEquals(received, sent,
+ "fresh EARLIEST consumer must receive both pre-split (sealed) "
+ + "and post-split (active) messages");
+ }
+
+ /**
+ * Same shape as the previous test but with a chain of two splits.
Sealed-on-sealed
+ * (segment 0 → segments 1+2 → segments 3+4 from re-splitting segment 1)
must all
+ * remain visible to a fresh EARLIEST consumer. Exercises the
+ * {@code getAllSegments()} path the assignment now uses, not just direct
active
+ * descendants.
+ */
+ @Test
+ public void testEarliestSubscribeAfterTwoSplitsReadsAllBacklog() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Set<String> sent = new HashSet<>();
+ // Phase 1: lands on segment 0.
+ for (int i = 0; i < 30; i++) {
+ String v = "p1-" + i;
+ producer.newMessage().key("a-" + i).value(v).send();
+ sent.add(v);
+ }
+ admin.scalableTopics().splitSegment(topic,
singleActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ // Phase 2: lands on the two children.
+ for (int i = 0; i < 30; i++) {
+ String v = "p2-" + i;
+ producer.newMessage().key("b-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Pick one of the two active segments and split it again — produces a
deeper DAG.
+ long childToSplit = anyActiveSegmentId(topic);
+ admin.scalableTopics().splitSegment(topic, childToSplit);
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 3));
+
+ // Phase 3: lands on the now-three active leaves.
+ for (int i = 0; i < 30; i++) {
+ String v = "p3-" + i;
+ producer.newMessage().key("c-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dag-replay-deep-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Ack cumulatively as we go — the broker uses subscription backlog ==
0 to decide
+ // a sealed segment is drained, so deferring acks until the end would
keep the
+ // pre-split parent "in backlog" forever and the active children would
never get
+ // unblocked.
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < sent.size() && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledgeCumulative(msg.id());
+ }
+ }
+ assertEquals(received, sent,
+ "fresh EARLIEST consumer must receive every message produced
across the DAG, "
+ + "including the two sealed generations");
+ }
+
+ /**
+ * After a fresh EARLIEST consumer drains the sealed backlog, a second
consumer joining
+ * the same subscription must <em>not</em> re-receive the
already-acknowledged sealed
+ * messages. The assignment still hands the second consumer some sealed
segments
+ * (rebalance is by hash range, ties broken by id), but the cursor on
those segments
+ * is at the end — the v4 layer fires {@code TopicTerminated} immediately
and the
+ * receive loop closes.
+ */
+ @Test
+ public void testSealedBacklogNotRedeliveredAfterFirstConsumerDrains()
throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Pre-split batch + split.
+ int preSplit = 30;
+ for (int i = 0; i < preSplit; i++) {
+ producer.newMessage().key("k-" + i).value("pre-" + i).send();
+ }
+ admin.scalableTopics().splitSegment(topic,
singleActiveSegmentId(topic));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegmentCount(topic), 2));
+
+ // First consumer drains everything (incl. the sealed segment) and
acks.
+ StreamConsumer<String> first =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("redelivery-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> firstReceived = new HashSet<>();
+ MessageId last = null;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (firstReceived.size() < preSplit && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = first.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ firstReceived.add(msg.value());
+ last = msg.id();
+ }
+ }
+ assertEquals(firstReceived.size(), preSplit, "first consumer must
drain all pre-split messages");
+ assertNotNull(last);
+ first.acknowledgeCumulative(last);
+ // Give the broker a moment to persist the cumulative ack on every
segment.
+ Thread.sleep(500);
+ first.close();
+
+ // Second consumer joins the same subscription. The sealed segment is
in the
+ // assignment, but its cursor is past every message — no redelivery.
+ @Cleanup
+ StreamConsumer<String> second =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("redelivery-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ Message<String> stray = second.receive(Duration.ofSeconds(2));
+ assertTrue(stray == null,
+ "after the first consumer fully acked the sealed backlog, a
fresh "
+ + "subscriber must NOT see those messages, got: "
+ + (stray == null ? "null" : stray.value()));
+ }
+
+ // --- Helpers ---
+
+ private long singleActiveSegmentId(String topic) throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ return seg.getSegmentId();
+ }
+ }
+ throw new AssertionError("no active segment for " + topic);
+ }
+
+ private long anyActiveSegmentId(String topic) throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ return seg.getSegmentId();
+ }
+ }
+ throw new AssertionError("no active segment for " + topic);
+ }
+
+ private int activeSegmentCount(String topic) throws Exception {
+ int active = 0;
+ for (var seg :
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ return active;
+ }
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index d04da28e35f..dd306c5805e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -280,4 +280,22 @@ public interface ScalableTopics {
* Delete a segment topic asynchronously.
*/
CompletableFuture<Void> deleteSegmentAsync(String segmentTopic, boolean
force);
+
+ /**
+ * Returns the number of unconsumed entries in the given subscription's
cursor on the
+ * segment topic — i.e. the per-subscription backlog. The call routes to
the broker
+ * that owns the segment topic, so it works whether the caller is
colocated with the
+ * segment or not.
+ *
+ * <p>Used internally by the {@link
org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator
+ * SubscriptionCoordinator} to detect when a sealed parent has been
drained and its
+ * children can be unblocked. Callers can also use it for diagnostics; a
returned
+ * {@code 0} on a sealed segment indicates the subscription has nothing
left to
+ * consume there.
+ *
+ * @param segmentTopic Full segment topic name ({@code
segment://tenant/namespace/topic/descriptor})
+ * @param subscription Subscription name
+ */
+ CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String
segmentTopic,
+ String
subscription);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 950ae667dca..779579f1d1b 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -263,6 +263,17 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncDeleteRequest(path);
}
+ @Override
+ public CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String
segmentTopic,
+ String
subscription) {
+ TopicName tn = TopicName.get(segmentTopic);
+ WebTarget path = adminSegments
+ .path(tn.getTenant()).path(tn.getNamespacePortion())
+ .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+ .path("subscription").path(subscription).path("backlog");
+ return asyncGetRequest(path, Long.class);
+ }
+
// --- Helpers ---
private static TopicName validateTopic(String topic) {