This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b3835c31dd [fix][broker] PIP-468: StreamConsumer / CheckpointConsumer 
DAG-replay with parent-drain ordering (#25642)
7b3835c31dd is described below

commit 7b3835c31dd84e35b571fb730a81431c13e84297
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 08:57:44 2026 -0700

    [fix][broker] PIP-468: StreamConsumer / CheckpointConsumer DAG-replay with 
parent-drain ordering (#25642)
---
 .../apache/pulsar/broker/admin/v2/Segments.java    |  54 ++++
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 .../service/scalable/ScalableTopicController.java  |  96 ++++++-
 .../service/scalable/ScalableTopicService.java     |  26 +-
 .../service/scalable/SegmentDrainChecker.java      |  50 ++++
 .../service/scalable/SubscriptionCoordinator.java  | 311 ++++++++++++++++++++-
 .../scalable/SubscriptionCoordinatorTest.java      | 184 +++++++++++-
 .../api/v5/V5CheckpointConsumerDagReplayTest.java  | 271 ++++++++++++++++++
 .../api/v5/V5StreamConsumerDagReplayTest.java      | 280 +++++++++++++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  18 ++
 .../client/admin/internal/ScalableTopicsImpl.java  |  11 +
 11 files changed, 1290 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index e01961f9a30..0ae8b3264a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
@@ -174,6 +175,59 @@ public class Segments extends AdminResource {
                 });
     }
 
+    @GET
+    
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog")
+    @ApiOperation(value = "Number of unconsumed entries in the segment topic 
for the "
+            + "given subscription. Super-user only.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 404, message = "Segment topic or subscription 
not found"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void getSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify the parent topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", 
required = true)
+            @PathParam("descriptor") String descriptor,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateNamespaceName(tenant, namespace);
+        TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
+
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
+                .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+                .thenAccept(optTopic -> {
+                    if (optTopic.isEmpty()) {
+                        // No topic loaded → no subscription cursor → no 
backlog. Returning
+                        // 0 here would be wrong (caller might mark the 
segment drained on
+                        // a topic that simply hasn't loaded yet); a 404 
forces the caller
+                        // to retry, which matches our drain-poll contract.
+                        throw new RestException(Response.Status.NOT_FOUND,
+                                "Segment topic not loaded: " + segmentTopic);
+                    }
+                    var sub = optTopic.get().getSubscription(subscription);
+                    if (sub == null) {
+                        throw new RestException(Response.Status.NOT_FOUND,
+                                "Subscription not found on segment: " + 
subscription);
+                    }
+                    
asyncResponse.resume(sub.getNumberOfEntriesInBacklog(false));
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .exception(ex).log("Failed to get segment 
subscription backlog");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/{descriptor}")
     @ApiOperation(value = "Delete a segment topic. Super-user only.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 871231f371a..2ed913561b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -879,6 +879,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final String subscription = 
commandScalableTopicSubscribe.getSubscription();
         final String consumerName = 
commandScalableTopicSubscribe.getConsumerName();
         final long consumerId = commandScalableTopicSubscribe.getConsumerId();
+        final org.apache.pulsar.common.api.proto.ScalableConsumerType 
consumerType =
+                commandScalableTopicSubscribe.getConsumerType();
 
         log.debug().attr("topic", topicStr).attr("subscription", subscription)
                 .attr("consumerName", consumerName).attr("requestId", 
requestId)
@@ -920,7 +922,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         return;
                     }
                     scalableTopicService.registerConsumer(topicName, 
subscription, consumerName,
-                                    consumerId, this)
+                                    consumerId, consumerType, this)
                             .whenCompleteAsync((assignment, ex) -> {
                                 if (ex != null) {
                                     Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 8b1805cd315..7aef354a0f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.scalable.HashRange;
 import org.apache.pulsar.common.scalable.SegmentInfo;
@@ -155,7 +156,32 @@ public class ScalableTopicController {
                 });
     }
 
+    /**
+     * Restore-path entry: consumer type isn't persisted in metadata yet, so 
we don't
+     * know whether the original subscription was STREAM (needs parent-drain 
ordering)
+     * or CHECKPOINT / QUEUE (mustn't have it — CHECKPOINT never drains 
parents because
+     * it doesn't create per-segment cursors). Default to <em>no 
enforcement</em>; on the
+     * first register-after-restore the controller calls
+     * {@link SubscriptionCoordinator#installDrainChecker} if the type is 
STREAM.
+     */
     private SubscriptionCoordinator createCoordinator(String subscription) {
+        return createCoordinator(subscription, null);
+    }
+
+    private SubscriptionCoordinator createCoordinator(String subscription,
+            ScalableConsumerType consumerType) {
+        // Parent-drain ordering matters only for STREAM consumers (Exclusive 
per-segment
+        // subscription with broker-tracked cursors → preserving per-key order 
across a
+        // split requires waiting for the parent to drain before handing out 
children).
+        // CHECKPOINT consumers track position client-side via Checkpoints and 
don't even
+        // create per-segment cursors — their parent never reports as drained, 
so the
+        // ordering machinery would block their children indefinitely. QUEUE 
consumers
+        // are shared and accept out-of-order delivery by design. Null type 
(restore
+        // path) starts without a checker; it's installed lazily on first 
STREAM
+        // register.
+        SegmentDrainChecker checker =
+                consumerType == ScalableConsumerType.STREAM ? 
this::isSegmentDrained : null;
+
         // Defensive: PulsarService.getConfig() is null in some unit-test 
mocks. Fall
         // back to the SubscriptionCoordinator's default grace period in that 
case.
         var config = brokerService.getPulsar().getConfig();
@@ -175,7 +201,42 @@ public class ScalableTopicController {
                 currentLayout,
                 resources,
                 brokerService.getPulsar().getExecutor(),
-                gracePeriod);
+                gracePeriod,
+                checker,
+                SubscriptionCoordinator.DEFAULT_DRAIN_INITIAL_DELAY,
+                SubscriptionCoordinator.DEFAULT_DRAIN_MAX_DELAY);
+    }
+
+    /**
+     * Drain check used by every {@link SubscriptionCoordinator} on this 
topic. Asks the
+     * segment topic's owning broker for the per-subscription backlog via the
+     * {@code /segments/.../subscription/.../backlog} admin endpoint, which 
redirects to
+     * the topic owner — works whether the controller and the segment colocate 
or not.
+     *
+     * <p>Returns {@code false} if the segment topic or subscription is not 
yet loaded
+     * (the admin endpoint replies 404). The next poll will succeed once the 
consumer's
+     * subscribe lands the topic on its owning broker.
+     */
+    private CompletableFuture<Boolean> isSegmentDrained(SegmentInfo segment, 
String subscription) {
+        String segmentTopicName = toSegmentPersistentName(segment);
+        try {
+            return brokerService.getPulsar().getAdminClient()
+                    .scalableTopics()
+                    .getSegmentSubscriptionBacklogAsync(segmentTopicName, 
subscription)
+                    .thenApply(backlog -> backlog != null && backlog <= 0)
+                    .exceptionally(ex -> {
+                        Throwable cause =
+                                
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) {
+                            // Topic or subscription not loaded yet — try 
again on the
+                            // next poll. The consumer's subscribe will 
materialize it.
+                            return false;
+                        }
+                        throw 
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+                    });
+        } catch (PulsarServerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
     }
 
     private CompletableFuture<Void> electLeader() {
@@ -309,14 +370,42 @@ public class ScalableTopicController {
      * <p>If a session with the same {@code consumerName} already exists (for 
example
      * because the consumer is reconnecting within the grace period), the 
existing
      * assignment is reused and no rebalance occurs.
+     *
+     * <p>The {@code consumerType} is used at coordinator creation time to 
decide whether
+     * to enforce parent-drain ordering on assignments — see
+     * {@link SubscriptionCoordinator}. The coordinator's setting is fixed at 
first
+     * registration (a subscription's type doesn't change in practice); 
subsequent
+     * registers with a different type still work but won't change the 
ordering policy.
+     */
+    /**
+     * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
+     *     for backward compatibility. New callers should pass the explicit 
type.
      */
+    @Deprecated
     public CompletableFuture<ConsumerAssignment> registerConsumer(String 
subscription,
                                                                    String 
consumerName,
                                                                    long 
consumerId,
                                                                    
TransportCnx cnx) {
+        return registerConsumer(subscription, consumerName, consumerId,
+                ScalableConsumerType.STREAM, cnx);
+    }
+
+    public CompletableFuture<ConsumerAssignment> registerConsumer(String 
subscription,
+                                                                   String 
consumerName,
+                                                                   long 
consumerId,
+                                                                   
ScalableConsumerType
+                                                                           
consumerType,
+                                                                   
TransportCnx cnx) {
         checkLeader();
         SubscriptionCoordinator coordinator = subscriptions.computeIfAbsent(
-                subscription, this::createCoordinator);
+                subscription, sub -> createCoordinator(sub, consumerType));
+        // The coordinator may have been created on the failover-restore path 
(consumer
+        // type unknown then; we defaulted to "no parent-drain enforcement"). 
Now that we
+        // know the type, upgrade if it's STREAM. installDrainChecker is a 
no-op if the
+        // coordinator already has a checker, so safe to call unconditionally.
+        if (consumerType == ScalableConsumerType.STREAM) {
+            coordinator.installDrainChecker(this::isSegmentDrained);
+        }
         return coordinator.registerConsumer(consumerName, consumerId, cnx)
                 .thenApply(assignments -> {
                     // Look up by name since the key may have been an existing 
session
@@ -525,6 +614,9 @@ public class ScalableTopicController {
 
     public CompletableFuture<Void> close() {
         closed = true;
+        // Stop each coordinator's drain poller before clearing — otherwise 
the scheduler
+        // task keeps running after the controller goes away.
+        subscriptions.values().forEach(SubscriptionCoordinator::close);
         subscriptions.clear();
         return leaderElection.asyncClose();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index e452d75af74..e43ca06ccbd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ScalableTopicStats;
@@ -230,12 +231,35 @@ public class ScalableTopicService {
     /**
      * Register a scalable consumer with the controller leader for {@code 
topic}.
      * Persists a durable session and returns the consumer's segment 
assignment.
+     *
+     * <p>The {@code consumerType} drives broker-side semantics that depend on 
the
+     * consumer mode — most importantly whether the {@link 
SubscriptionCoordinator}
+     * enforces parent-drain ordering before handing out children of a split. 
STREAM
+     * consumers want it (per-key ordering); CHECKPOINT and QUEUE consumers 
don't
+     * (they either track position client-side or have shared, 
already-out-of-order
+     * delivery semantics).
      */
     public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName 
topic, String subscription,
                                                                    String 
consumerName, long consumerId,
+                                                                   
ScalableConsumerType
+                                                                           
consumerType,
                                                                    
org.apache.pulsar.broker.service.TransportCnx cnx) {
         return getOrCreateController(topic)
-                .thenCompose(controller -> 
controller.registerConsumer(subscription, consumerName, consumerId, cnx));
+                .thenCompose(controller ->
+                        controller.registerConsumer(subscription, 
consumerName, consumerId,
+                                consumerType, cnx));
+    }
+
+    /**
+     * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
+     *     for backward compatibility. New callers should pass the explicit 
consumer type.
+     */
+    @Deprecated
+    public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName 
topic, String subscription,
+                                                                   String 
consumerName, long consumerId,
+                                                                   
org.apache.pulsar.broker.service.TransportCnx cnx) {
+        return registerConsumer(topic, subscription, consumerName, consumerId,
+                ScalableConsumerType.STREAM, cnx);
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
new file mode 100644
index 00000000000..10210d6c67c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SegmentDrainChecker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+
+/**
+ * Resolves whether a (sealed) segment has been fully drained by a particular 
subscription.
+ *
+ * <p>Used by {@link SubscriptionCoordinator} to decide when newly-active 
children of a
+ * split / merge can be assigned to consumers: an active child is only 
assignable once
+ * <em>every</em> parent has been drained for the subscription, so message 
order with respect
+ * to the split point is preserved.
+ *
+ * <p>Implementations typically read the segment topic's per-subscription 
backlog (the
+ * cursor on a sealed topic with {@code msgBacklog == 0} is by definition at 
the end). For
+ * subscriptions started with {@code Latest}, every sealed segment's cursor is 
created at
+ * the topic's end, so the drain check completes immediately.
+ */
+@FunctionalInterface
+public interface SegmentDrainChecker {
+
+    /**
+     * Returns {@code true} if the segment's cursor for {@code subscription} 
has reached the
+     * end of the segment's data, {@code false} otherwise. Errors complete the 
future
+     * exceptionally; callers should treat them as "not drained yet" and retry.
+     *
+     * @param segment the segment to check (only meaningful for sealed 
segments — active
+     *                segments still receive new messages, so they're never 
drained)
+     * @param subscription the subscription whose cursor we're asking about
+     */
+    CompletableFuture<Boolean> isDrained(SegmentInfo segment, String 
subscription);
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
index ff55b7854ed..ade25fb1779 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,12 +31,15 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import org.apache.pulsar.broker.resources.ScalableTopicResources;
 import org.apache.pulsar.broker.service.TransportCnx;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.scalable.SegmentInfo;
 import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.common.util.Backoff;
 
 /**
  * Manages segment-to-consumer assignments within a single subscription of a 
scalable topic.
@@ -65,24 +69,68 @@ public class SubscriptionCoordinator {
     // TODO: make configurable via broker config (e.g. 
scalableTopicConsumerSessionTimeoutSeconds)
     private static final Duration DEFAULT_GRACE_PERIOD = 
Duration.ofSeconds(60);
 
+    /**
+     * First drain-status check fires this long after a sealed segment shows 
up — fast
+     * enough that a freshly subscribed EARLIEST consumer doesn't wait for 
backlog
+     * unblocking on a small topic.
+     */
+    static final Duration DEFAULT_DRAIN_INITIAL_DELAY = Duration.ofSeconds(2);
+
+    /**
+     * Upper bound on the drain-poll interval. Reached by exponential backoff 
while a
+     * sealed segment remains undrained — long-tail backlog (consumers stalled 
for hours)
+     * shouldn't have us hammering the segment owner every couple seconds. 
Reset on every
+     * progress event (drain detected / layout changed / new consumer).
+     */
+    static final Duration DEFAULT_DRAIN_MAX_DELAY = Duration.ofMinutes(15);
+
     @Getter
     private final String subscriptionName;
     private final TopicName topicName;
     private final ScalableTopicResources resources;
     private final ScheduledExecutorService scheduler;
     private final Duration gracePeriod;
+    private final Duration drainInitialDelay;
+    private final Duration drainMaxDelay;
 
     /** Keyed by consumerName — the stable session identity. */
     private final Map<String, ConsumerSession> sessions = new 
ConcurrentHashMap<>();
     private Map<Long, ConsumerSession> segmentAssignments = new 
LinkedHashMap<>();
     private SegmentLayout currentLayout;
 
+    /**
+     * Sealed segments confirmed drained for this subscription (cursor at 
end). In-memory
+     * only — on controller-leader failover the new leader rediscovers drain 
status by
+     * polling. {@link #computeAssignment} consults this set to decide which 
active
+     * children of a split / merge are eligible to be assigned (children stay 
blocked
+     * until <em>every</em> sealed parent is drained, so message order isn't 
broken).
+     */
+    private final Set<Long> drainedSegmentIds = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Backoff governing the drain-poll cadence. Starts at {@link 
#drainInitialDelay},
+     * grows exponentially up to {@link #drainMaxDelay}, and is reset on every 
progress
+     * event (a segment is observed drained, the layout changes, a new 
consumer joins).
+     */
+    private final Backoff drainBackoff;
+    private ScheduledFuture<?> drainPollTask;
+    private boolean drainPollInProgress;
+    private boolean closed;
+    /**
+     * Drain checker installed on the coordinator. Mutable: starts null on the
+     * controller-leader-failover restore path (consumer type unknown until 
reconnect),
+     * upgraded to a real checker on first STREAM register via {@link 
#installDrainChecker}.
+     * Once non-null it stays non-null (we don't downgrade to no-ordering 
mid-flight).
+     */
+    private SegmentDrainChecker drainChecker;
+
     public SubscriptionCoordinator(String subscriptionName,
                                    TopicName topicName,
                                    SegmentLayout initialLayout,
                                    ScalableTopicResources resources,
                                    ScheduledExecutorService scheduler) {
-        this(subscriptionName, topicName, initialLayout, resources, scheduler, 
DEFAULT_GRACE_PERIOD);
+        this(subscriptionName, topicName, initialLayout, resources, scheduler,
+                DEFAULT_GRACE_PERIOD, null, DEFAULT_DRAIN_INITIAL_DELAY, 
DEFAULT_DRAIN_MAX_DELAY);
     }
 
     public SubscriptionCoordinator(String subscriptionName,
@@ -91,15 +139,36 @@ public class SubscriptionCoordinator {
                                    ScalableTopicResources resources,
                                    ScheduledExecutorService scheduler,
                                    Duration gracePeriod) {
+        this(subscriptionName, topicName, initialLayout, resources, scheduler, 
gracePeriod,
+                null, DEFAULT_DRAIN_INITIAL_DELAY, DEFAULT_DRAIN_MAX_DELAY);
+    }
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler,
+                                   Duration gracePeriod,
+                                   SegmentDrainChecker drainChecker,
+                                   Duration drainInitialDelay,
+                                   Duration drainMaxDelay) {
         this.subscriptionName = subscriptionName;
         this.topicName = topicName;
         this.currentLayout = initialLayout;
         this.resources = resources;
         this.scheduler = scheduler;
         this.gracePeriod = gracePeriod;
+        this.drainChecker = drainChecker;
+        this.drainInitialDelay = drainInitialDelay;
+        this.drainMaxDelay = drainMaxDelay;
+        this.drainBackoff = Backoff.builder()
+                .initialDelay(drainInitialDelay)
+                .maxBackoff(drainMaxDelay)
+                .build();
         this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
     }
 
+
     // --- Register / unregister / reconnect ---
 
     /**
@@ -132,7 +201,12 @@ public class SubscriptionCoordinator {
                 .thenApply(__ -> {
                     synchronized (this) {
                         sessions.put(consumerName, session);
-                        return rebalanceAndNotify();
+                        Map<ConsumerSession, ConsumerAssignment> result = 
rebalanceAndNotify();
+                        // First consumer (or rejoining one) — kick off drain 
checks at the
+                        // shortest delay rather than whatever long backoff 
we'd accumulated
+                        // while idle.
+                        resetAndRearmDrainPoll();
+                        return result;
                     }
                 });
     }
@@ -202,7 +276,8 @@ public class SubscriptionCoordinator {
 
     /**
      * Handle a layout change (split/merge). Recompute and push assignments to 
connected
-     * consumers.
+     * consumers, then make sure the drain poller is running so any new sealed 
segments
+     * get noticed quickly (backoff reset → next check fires at the initial 
delay).
      */
     public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> onLayoutChange(
             SegmentLayout newLayout) {
@@ -211,7 +286,195 @@ public class SubscriptionCoordinator {
             segmentAssignments.clear();
             return CompletableFuture.completedFuture(Map.of());
         }
-        return CompletableFuture.completedFuture(rebalanceAndNotify());
+        Map<ConsumerSession, ConsumerAssignment> result = rebalanceAndNotify();
+        // New sealed segments may have appeared — restart drain checks from 
the
+        // initial delay rather than continuing whatever long backoff we'd 
settled into.
+        resetAndRearmDrainPoll();
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Install a drain checker on a coordinator that doesn't have one yet. 
Used on the
+     * first STREAM register against a coordinator that was created on the
+     * controller-failover restore path (where consumer type wasn't known and 
we
+     * defaulted to "no enforcement"). No-op if a checker is already installed.
+     */
+    synchronized void installDrainChecker(SegmentDrainChecker checker) {
+        if (this.drainChecker != null || checker == null) {
+            return;
+        }
+        this.drainChecker = checker;
+        log.info().log("Drain checker installed on existing coordinator; 
rebalancing");
+        if (!sessions.isEmpty()) {
+            rebalanceAndNotify();
+        }
+        resetAndRearmDrainPoll();
+    }
+
+    /**
+     * Stop the periodic drain-status poller. Called by the controller on 
close. Idempotent.
+     * Also flips a {@code closed} flag so any {@link #pollDrainStatus()} 
iteration that's
+     * mid-flight aborts its rearm step instead of leaking a task into the 
executor.
+     */
+    public synchronized void close() {
+        closed = true;
+        if (drainPollTask != null) {
+            drainPollTask.cancel(false);
+            drainPollTask = null;
+        }
+    }
+
+    // --- Drain tracking ---
+
+    /**
+     * A segment is assignable to consumers when:
+     * <ul>
+     *   <li>it's sealed — there's no harm in always handing it out (the v4 
layer drains or
+     *       sees {@code TopicTerminated} immediately if already drained); 
or</li>
+     *   <li>it's active <b>and</b> every parent in the current layout has 
been drained for
+     *       this subscription. Without the parent-drain check we'd hand a 
consumer the
+     *       child of a just-split segment immediately, breaking per-key 
ordering against
+     *       any unread messages still sitting in the parent.</li>
+     * </ul>
+     *
+     * <p>If no {@link SegmentDrainChecker} was configured (e.g., the simple 
test
+     * constructor), the parent-drain ordering is disabled and every segment 
is treated
+     * as assignable.
+     */
+    private boolean isAssignable(SegmentInfo segment, SegmentLayout layout) {
+        if (drainChecker == null || !segment.isActive()) {
+            return true;
+        }
+        for (long parentId : segment.parentIds()) {
+            // A parent that's no longer in the DAG has been pruned (its data 
is gone), so
+            // treat it as drained — there's nothing to wait on.
+            if (layout.getAllSegments().containsKey(parentId)
+                    && !drainedSegmentIds.contains(parentId)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Mark the given sealed segments as drained. Visible for testing — 
production code
+     * goes through {@link #pollDrainStatus()}, which queries the broker.
+     */
+    synchronized void markSegmentsDrained(Set<Long> segmentIds) {
+        boolean changed = false;
+        for (long id : segmentIds) {
+            if (drainedSegmentIds.add(id)) {
+                changed = true;
+            }
+        }
+        if (changed) {
+            // Progress: a sealed segment is now drained. Reset the backoff 
and re-arm so
+            // the next poll lands at the initial delay — once one parent 
drains, others
+            // on the same DAG branch often follow shortly.
+            if (!sessions.isEmpty()) {
+                log.info().attr("drained", segmentIds).log("Sealed segments 
drained, rebalancing");
+                rebalanceAndNotify();
+            }
+            resetAndRearmDrainPoll();
+        }
+    }
+
+    /**
+     * Cancel any pending drain-poll task, reset the backoff, and re-arm the 
poller. Used
+     * on every progress event (drain detected, layout change, fresh consumer) 
so the
+     * next check lands at {@link #drainInitialDelay} rather than at whatever 
long backoff
+     * we'd settled into during a quiet period.
+     */
+    private synchronized void resetAndRearmDrainPoll() {
+        drainBackoff.reset();
+        if (drainPollTask != null) {
+            drainPollTask.cancel(false);
+            drainPollTask = null;
+        }
+        ensureDrainPollerRunning();
+    }
+
+    /**
+     * (Re-)arm the drain poller. No-op when no {@link SegmentDrainChecker} 
was configured
+     * or when the coordinator has been {@link #close() closed}. Each iteration
+     * self-schedules the next via the {@link Backoff} so the cadence grows 
exponentially
+     * while sealed segments stay undrained — short delays at first (a fresh 
EARLIEST
+     * consumer typically drains the parent within seconds), capped at
+     * {@link #drainMaxDelay} for long-tail backlogs.
+     */
+    private void ensureDrainPollerRunning() {
+        if (closed || drainChecker == null) {
+            return;
+        }
+        if (drainPollTask != null && !drainPollTask.isCancelled() && 
!drainPollTask.isDone()) {
+            return;
+        }
+        long delayMs = drainBackoff.next().toMillis();
+        drainPollTask = scheduler.schedule(this::pollDrainStatus, delayMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Single poll iteration: ask the drain checker about every sealed segment 
that isn't
+     * already known to be drained, collect the newly-drained ones, and 
rebalance. Schedule
+     * the next iteration via the backoff (or stop, if there's nothing left to 
check).
+     */
+    private void pollDrainStatus() {
+        SegmentLayout layout;
+        List<SegmentInfo> toCheck;
+        synchronized (this) {
+            drainPollTask = null;
+            if (closed || drainPollInProgress || sessions.isEmpty()) {
+                return;
+            }
+            layout = currentLayout;
+            toCheck = layout.getAllSegments().values().stream()
+                    .filter(seg -> !seg.isActive() && 
!drainedSegmentIds.contains(seg.segmentId()))
+                    .toList();
+            if (toCheck.isEmpty()) {
+                // Every sealed segment is already drained — no need to keep 
polling. The
+                // poller re-arms when onLayoutChange / registerConsumer adds 
new work.
+                drainBackoff.reset();
+                return;
+            }
+            drainPollInProgress = true;
+        }
+
+        Set<Long> newlyDrained = ConcurrentHashMap.newKeySet();
+        @SuppressWarnings("rawtypes")
+        CompletableFuture[] futures = new CompletableFuture[toCheck.size()];
+        for (int i = 0; i < toCheck.size(); i++) {
+            SegmentInfo seg = toCheck.get(i);
+            futures[i] = drainChecker.isDrained(seg, subscriptionName)
+                    .handle((drained, ex) -> {
+                        if (ex != null) {
+                            // Log at debug — drain checks happen often and 
transient errors
+                            // (e.g., topic still being looked up) shouldn't 
spam.
+                            log.debug().attr("segmentId", seg.segmentId())
+                                    .exceptionMessage(ex)
+                                    .log("Drain check failed; will retry next 
poll");
+                            return null;
+                        }
+                        if (Boolean.TRUE.equals(drained)) {
+                            newlyDrained.add(seg.segmentId());
+                        }
+                        return null;
+                    });
+        }
+        CompletableFuture.allOf(futures).whenComplete((__, ex) -> {
+            try {
+                if (!newlyDrained.isEmpty()) {
+                    // markSegmentsDrained resets the backoff for us.
+                    markSegmentsDrained(new HashSet<>(newlyDrained));
+                }
+            } finally {
+                synchronized (SubscriptionCoordinator.this) {
+                    drainPollInProgress = false;
+                    // Re-arm: undrained sealed segments still pending → keep 
polling, but
+                    // with the next (longer) backoff if no progress was made 
this round.
+                    ensureDrainPollerRunning();
+                }
+            }
+        });
     }
 
     // --- Accessors ---
@@ -267,11 +530,29 @@ public class SubscriptionCoordinator {
     }
 
     /**
-     * Compute a balanced assignment of active segments to consumers.
+     * Compute a balanced assignment of segments to consumers.
+     *
+     * <p>Strategy: sort segments by hash range, then segment id (tiebreak), 
sort consumers by
+     * name, then round-robin. Deterministic: the same inputs always produce 
the same output,
+     * so a new leader recomputing assignments after failover gets the same 
result as the old
+     * leader.
+     *
+     * <p><b>DAG replay.</b> The assignment includes every <em>sealed</em> 
segment in the
+     * DAG. A fresh EARLIEST subscription needs to read messages produced 
before it joined,
+     * and those may live on segments that have since been sealed by a split / 
merge.
+     *
+     * <p><b>Parent-drain ordering.</b> An <em>active</em> child segment is 
only assigned
+     * once <em>every</em> parent in the DAG has been drained for this 
subscription
+     * (tracked in {@link #drainedSegmentIds}). Without this guard a consumer 
would be
+     * handed an active child immediately after a split and start receiving 
new messages
+     * for some key while the same key's pre-split messages still sit unread 
on the sealed
+     * parent — breaking per-key ordering. Initial active segments (those with 
no parents
+     * in the layout) are unaffected and assigned right away.
      *
-     * <p>Strategy: sort segments by hash range start, sort consumers by name, 
then
-     * round-robin. Deterministic: the same inputs always produce the same 
output, so a new
-     * leader recomputing assignments after failover gets the same result as 
the old leader.
+     * <p>The client side (per-segment v4 consumer) drains a 
sealed-but-still-present
+     * segment naturally and closes it on {@code TopicTerminated}; a 
sealed-and-already-
+     * drained segment yields {@code TopicTerminated} immediately, so the cost 
of
+     * including it is one short-lived v4 subscribe.
      */
     Map<ConsumerSession, ConsumerAssignment> computeAssignment(
             SegmentLayout layout, Collection<ConsumerSession> consumers) {
@@ -280,8 +561,10 @@ public class SubscriptionCoordinator {
             return Map.of();
         }
 
-        List<SegmentInfo> sortedSegments = 
layout.getActiveSegments().values().stream()
-                .sorted(Comparator.comparing(SegmentInfo::hashRange))
+        List<SegmentInfo> sortedSegments = 
layout.getAllSegments().values().stream()
+                .filter(seg -> isAssignable(seg, layout))
+                .sorted(Comparator.comparing(SegmentInfo::hashRange)
+                        .thenComparingLong(SegmentInfo::segmentId))
                 .toList();
 
         List<ConsumerSession> sortedConsumers = consumers.stream()
@@ -339,4 +622,12 @@ public class SubscriptionCoordinator {
     private Map<ConsumerSession, ConsumerAssignment> snapshotAssignments() {
         return computeAssignment(currentLayout, sessions.values());
     }
+
+    /**
+     * Test hook: return the assignment that would be sent right now, computed 
against the
+     * current layout and connected consumers. Visible for unit tests.
+     */
+    synchronized Map<ConsumerSession, ConsumerAssignment> currentAssignment() {
+        return computeAssignment(currentLayout, sessions.values());
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
index 4de8ddb0a43..d3f6e1e0bc0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinatorTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -141,8 +142,189 @@ public class SubscriptionCoordinatorTest {
         Map<ConsumerSession, ConsumerAssignment> result =
                 coordinator.onLayoutChange(newLayout).get();
 
+        // After the split: segment 0 is sealed, two new active children take 
its place,
+        // and segments 1..3 stay active. The default test coordinator runs 
without a
+        // SegmentDrainChecker, so parent-drain ordering is disabled and every 
segment in
+        // the DAG (active + sealed) is assigned — 5 active + 1 sealed = 6.
         assertEquals(result.size(), 1);
-        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 5);
+        assertEquals(findByName(result, 
"consumer-1").assignedSegments().size(), 6);
+    }
+
+    /**
+     * Coordinator running with a {@link SegmentDrainChecker} must hold back 
active
+     * children until <em>every</em> sealed parent is reported as drained. 
Until then
+     * only the sealed parent and the unrelated initial active segments make 
it into
+     * the assignment.
+     */
+    @Test
+    public void testActiveChildrenBlockedUntilParentDrained() throws Exception 
{
+        // Re-create the coordinator with a controllable drain checker. We 
start with no
+        // sealed segments reported as drained.
+        Set<Long> drained = ConcurrentHashMap.newKeySet();
+        SegmentDrainChecker checker = (segment, sub) ->
+                
CompletableFuture.completedFuture(drained.contains(segment.segmentId()));
+        SubscriptionCoordinator orderedCoordinator = new 
SubscriptionCoordinator("test-sub",
+                topicName, initialLayout, resources, scheduler, 
Duration.ofMillis(200),
+                checker, Duration.ofMillis(50), Duration.ofSeconds(5));
+        try {
+            orderedCoordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+
+            SegmentLayout afterSplit = initialLayout.splitSegment(0);
+            Map<ConsumerSession, ConsumerAssignment> result =
+                    orderedCoordinator.onLayoutChange(afterSplit).get();
+
+            // Layout: segment 0 sealed (parent=∅), segments 1..3 active 
(parent=∅),
+            // segments 4 + 5 active (parent=[0]). Children of 0 must be 
excluded until
+            // 0 is drained.
+            ConsumerAssignment a = findByName(result, "consumer-1");
+            assertNotNull(a);
+            Set<Long> assigned = new HashSet<>(segmentIds(a));
+            assertTrue(assigned.containsAll(Set.of(0L, 1L, 2L, 3L)),
+                    "sealed parent + initial active children must be assigned, 
got " + assigned);
+            assertFalse(assigned.contains(4L), "child of un-drained parent 
must be blocked");
+            assertFalse(assigned.contains(5L), "child of un-drained parent 
must be blocked");
+
+            // Mark the parent drained — the next poll should pick it up and 
the children
+            // must end up assigned.
+            drained.add(0L);
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> 
{
+                Set<Long> nowAssigned = new HashSet<>(segmentIds(
+                        findByName(orderedCoordinator.currentAssignment(), 
"consumer-1")));
+                assertTrue(nowAssigned.containsAll(Set.of(0L, 1L, 2L, 3L, 4L, 
5L)),
+                        "after parent drain, all 6 segments must be assigned, 
got " + nowAssigned);
+            });
+        } finally {
+            orderedCoordinator.close();
+        }
+    }
+
+    /**
+     * After the drain poller has backed off, a fresh consumer registration 
must cancel
+     * the long-delay scheduled task and re-arm at the initial delay — 
otherwise the new
+     * consumer would wait the full backed-off delay before its first drain 
check.
+     * Regression for the review note: {@code ensureDrainPollerRunning} alone 
is a no-op
+     * when a task is already scheduled, so progress events have to 
cancel-and-rearm.
+     */
+    @Test
+    public void testFreshRegisterCancelsBackedOffPollAndRearmsImmediately() 
throws Exception {
+        java.util.concurrent.atomic.AtomicInteger checks = new 
java.util.concurrent.atomic.AtomicInteger();
+        SegmentDrainChecker checker = (segment, sub) -> {
+            checks.incrementAndGet();
+            return CompletableFuture.completedFuture(false);
+        };
+        // Initial 50ms, max 5s — exponential, several polls happen quickly.
+        SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
+                topicName, initialLayout.splitSegment(0), resources, scheduler,
+                Duration.ofMillis(200), checker, Duration.ofMillis(50), 
Duration.ofSeconds(5));
+        try {
+            c.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+            // Let a few polls happen so the backoff grows past the initial 
delay.
+            Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                    .until(() -> checks.get() >= 3);
+
+            int countBefore = checks.get();
+            // Pretend the long-delay task is in flight by registering another 
consumer —
+            // the reset+rearm helper must fire a fresh check at ~initial 
delay (50ms).
+            c.registerConsumer("consumer-2", 2L, 
mock(TransportCnx.class)).get();
+            // 250ms is well under any plausible backed-off delay; if the task 
wasn't
+            // cancelled and re-armed, no new check would land in this window.
+            Thread.sleep(250);
+            assertTrue(checks.get() > countBefore,
+                    "register-on-progress must cancel pending task and re-arm 
at initial delay");
+        } finally {
+            c.close();
+        }
+    }
+
+    /**
+     * close() during an in-flight poll must prevent the {@code whenComplete} 
rearm path
+     * from scheduling a new task — otherwise the controller goes away while a 
stale
+     * drain check is queued for execution.
+     */
+    @Test
+    public void testCloseRaceWithInFlightPollDoesNotLeakRearm() throws 
Exception {
+        // Drain checker that blocks until released — gives us a deterministic 
in-flight
+        // window that overlaps with close().
+        CompletableFuture<Boolean> blocking = new CompletableFuture<>();
+        java.util.concurrent.atomic.AtomicInteger checks = new 
java.util.concurrent.atomic.AtomicInteger();
+        SegmentDrainChecker checker = (segment, sub) -> {
+            checks.incrementAndGet();
+            return blocking;
+        };
+        SubscriptionCoordinator c = new SubscriptionCoordinator("test-sub",
+                topicName, initialLayout.splitSegment(0), resources, scheduler,
+                Duration.ofMillis(200), checker, Duration.ofMillis(20), 
Duration.ofSeconds(1));
+        c.registerConsumer("consumer-1", 1L, mock(TransportCnx.class)).get();
+        // Wait until at least one poll has started.
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> 
checks.get() >= 1);
+
+        c.close();
+        // Release the in-flight check after close — its whenComplete will run 
with
+        // closed=true and must NOT schedule another task.
+        blocking.complete(false);
+
+        int countAfterClose = checks.get();
+        Thread.sleep(200);
+        assertEquals(checks.get(), countAfterClose,
+                "no further drain checks should fire after close()");
+    }
+
+    /**
+     * On controller-leader failover the restore path creates a coordinator 
without
+     * knowing the consumer type (not yet persisted in metadata). It must 
default to
+     * <em>no parent-drain enforcement</em> — picking STREAM as a 
"conservative default"
+     * would deadlock CHECKPOINT subscriptions, whose parents never report 
drained.
+     */
+    @Test
+    public void testRestoredCoordinatorStartsWithoutParentDrainOrdering() 
throws Exception {
+        // restoreConsumers installs a session and computes assignment 
immediately. With
+        // no drain checker the active children of the (sealed) split parent 
are eligible
+        // right away.
+        coordinator.restoreConsumers(java.util.List.of("consumer-1"));
+        SegmentLayout afterSplit = initialLayout.splitSegment(0);
+        Map<ConsumerSession, ConsumerAssignment> result =
+                coordinator.onLayoutChange(afterSplit).get();
+        ConsumerAssignment a = findByName(result, "consumer-1");
+        assertNotNull(a);
+        Set<Long> assigned = new HashSet<>(segmentIds(a));
+        // 4 active + 1 sealed + 2 split children — restore-path coordinator 
has no
+        // checker so children are NOT held back.
+        assertEquals(assigned.size(), 6,
+                "restored coordinator (no drain checker) must hand out every 
DAG segment "
+                        + "right away to avoid deadlocking CHECKPOINT/QUEUE 
subs, got "
+                        + assigned);
+    }
+
+    /**
+     * After a STREAM consumer reconnects to a restored coordinator, the 
controller is
+     * expected to install a real drain checker via
+     * {@link 
SubscriptionCoordinator#installDrainChecker(SegmentDrainChecker)}. Once
+     * installed, parent-drain ordering kicks in: children of an un-drained 
parent get
+     * filtered out of the next assignment.
+     */
+    @Test
+    public void testInstallDrainCheckerAfterRestoreEnablesOrdering() throws 
Exception {
+        coordinator.registerConsumer("consumer-1", 1L, 
mock(TransportCnx.class)).get();
+        SegmentLayout afterSplit = initialLayout.splitSegment(0);
+        coordinator.onLayoutChange(afterSplit).get();
+        // No checker yet → all 6 segments assigned.
+        assertEquals(segmentIds(findByName(coordinator.currentAssignment(), 
"consumer-1")).size(),
+                6);
+
+        // Install a checker that says "nothing is drained" → children of 
segment 0 must
+        // disappear from the next assignment.
+        Set<Long> drained = ConcurrentHashMap.newKeySet();
+        coordinator.installDrainChecker((segment, sub) ->
+                
CompletableFuture.completedFuture(drained.contains(segment.segmentId())));
+        Set<Long> assigned = new HashSet<>(segmentIds(
+                findByName(coordinator.currentAssignment(), "consumer-1")));
+        // Parent stays (sealed always assignable); children of 0 are blocked.
+        assertTrue(assigned.contains(0L) && assigned.contains(1L) && 
assigned.contains(2L)
+                        && assigned.contains(3L),
+                "sealed parent + initial active segments must remain, got " + 
assigned);
+        assertEquals(assigned.size(), 4,
+                "active children of un-drained parent must be filtered after 
install, got "
+                        + assigned);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
new file mode 100644
index 00000000000..fdebd133f9c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerDagReplayTest.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * DAG-replay coverage for V5 {@link CheckpointConsumer} — the checkpoint 
analogue of
+ * {@code V5StreamConsumerDagReplayTest}. A fresh {@link 
Checkpoint#earliest()} consumer
+ * subscribed after a split must still read the messages that ended up on the 
now-sealed
+ * parent segment.
+ *
+ * <p>Unmanaged {@link CheckpointConsumer} (no {@code consumerGroup(...)}) 
walks the DAG
+ * via {@code DagWatchClient} and subscribes to active + sealed segments 
directly — it
+ * doesn't go through the broker's {@link
+ * org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}. The 
managed flow
+ * (with {@code consumerGroup(...)}) does, so the parent-drain ordering 
applies there;
+ * unlike a {@link StreamConsumer}, a managed {@link CheckpointConsumer} 
doesn't create
+ * subscription cursors on the segment topics (it reads via Readers and tracks 
position
+ * client-side as a {@link Checkpoint}), so the broker treats every sealed 
segment as
+ * effectively drained and unblocks the active children right away.
+ */
+public class V5CheckpointConsumerDagReplayTest extends V5ClientBaseTest {
+
+    @Test
+    public void testEarliestUnmanagedCheckpointPostSplitReadsSealedBacklog() 
throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Phase 1: messages land on segment 0.
+        int preSplit = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < preSplit; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Split: segment 0 sealed, two new active children.
+        long parentSegment = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentSegment);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        // Phase 2: messages land on the children.
+        int postSplit = 30;
+        for (int i = 0; i < postSplit; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("kk-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Fresh consumer subscribes AFTER split — must see the pre-split 
sealed backlog
+        // alongside the post-split active data. Unmanaged path uses 
DagWatchClient and
+        // covers active + sealed segments, so this works without going through
+        // SubscriptionCoordinator at all.
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Set<String> received = drainExactly(consumer, sent.size());
+        assertEquals(received, sent,
+                "unmanaged EARLIEST checkpoint consumer must receive both 
pre-split "
+                        + "(sealed) and post-split (active) messages");
+    }
+
+    @Test
+    public void testEarliestUnmanagedCheckpointAfterTwoSplitsReadsAllBacklog() 
throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
+            String v = "p1-" + i;
+            producer.newMessage().key("a-" + i).value(v).send();
+            sent.add(v);
+        }
+        admin.scalableTopics().splitSegment(topic, 
singleActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        for (int i = 0; i < 20; i++) {
+            String v = "p2-" + i;
+            producer.newMessage().key("b-" + i).value(v).send();
+            sent.add(v);
+        }
+        admin.scalableTopics().splitSegment(topic, anyActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 3));
+
+        for (int i = 0; i < 20; i++) {
+            String v = "p3-" + i;
+            producer.newMessage().key("c-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Set<String> received = drainExactly(consumer, sent.size());
+        assertEquals(received, sent,
+                "unmanaged EARLIEST checkpoint consumer must receive every 
message produced "
+                        + "across the DAG, including both sealed generations");
+    }
+
+    /**
+     * Same scenario as the unmanaged version, but with {@code 
consumerGroup(...)}: the
+     * broker drives the assignment via {@link
+     * org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}. 
Checkpoint
+     * consumers don't create per-segment subscription cursors, so the 
broker's drain
+     * check sees no subscription on the sealed parent → no backlog to worry 
about → the
+     * active children are unblocked right away.
+     */
+    @Test
+    public void testEarliestManagedCheckpointPostSplitReadsSealedBacklog() 
throws Exception {
+        String topic = newScalableTopic(1);
+        String group = "managed-dag-replay";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        int preSplit = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < preSplit; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        admin.scalableTopics().splitSegment(topic, 
singleActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        int postSplit = 30;
+        for (int i = 0; i < postSplit; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("kk-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Set<String> received = drainExactly(consumer, sent.size());
+        assertEquals(received, sent,
+                "managed EARLIEST checkpoint consumer must receive both 
pre-split "
+                        + "(sealed) and post-split (active) messages");
+    }
+
+    @Test
+    public void testEarliestManagedCheckpointAfterTwoSplitsReadsAllBacklog() 
throws Exception {
+        String topic = newScalableTopic(1);
+        String group = "managed-dag-replay-deep";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 20; i++) {
+            String v = "p1-" + i;
+            producer.newMessage().key("a-" + i).value(v).send();
+            sent.add(v);
+        }
+        admin.scalableTopics().splitSegment(topic, 
singleActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        for (int i = 0; i < 20; i++) {
+            String v = "p2-" + i;
+            producer.newMessage().key("b-" + i).value(v).send();
+            sent.add(v);
+        }
+        admin.scalableTopics().splitSegment(topic, anyActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 3));
+
+        for (int i = 0; i < 20; i++) {
+            String v = "p3-" + i;
+            producer.newMessage().key("c-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Set<String> received = drainExactly(consumer, sent.size());
+        assertEquals(received, sent,
+                "managed EARLIEST checkpoint consumer must receive every 
message produced "
+                        + "across the DAG, including both sealed generations");
+    }
+
+    // --- Helpers ---
+
+    private Set<String> drainExactly(CheckpointConsumer<String> consumer, int 
expected) throws Exception {
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+            }
+        }
+        return received;
+    }
+
+    private long singleActiveSegmentId(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                return seg.getSegmentId();
+            }
+        }
+        throw new AssertionError("no active segment for " + topic);
+    }
+
+    private long anyActiveSegmentId(String topic) throws Exception {
+        return singleActiveSegmentId(topic);
+    }
+
+    private int activeSegmentCount(String topic) throws Exception {
+        int active = 0;
+        for (var seg : 
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+            if (seg.isActive()) {
+                active++;
+            }
+        }
+        return active;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
new file mode 100644
index 00000000000..d5204864a8c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerDagReplayTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * DAG-replay coverage for V5 {@link StreamConsumer}.
+ *
+ * <p>Companion to the QueueConsumer fix in #25611 ({@code subscribes to 
active + sealed}).
+ * StreamConsumer assignment is controller-driven: the broker's
+ * {@link 
org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator}{@code
+ * #computeAssignment} now hands consumers every segment in the DAG, not just 
the
+ * currently-active ones, so a fresh EARLIEST consumer can pick up backlog 
that lives on
+ * already-sealed segments (i.e., messages produced before a split / merge 
happened).
+ *
+ * <p>Without this, {@link 
#testEarliestSubscribePostSplitReadsSealedBacklog()} would only
+ * see messages produced after the consumer joined; sealed-segment data would 
be silently
+ * orphaned.
+ */
+public class V5StreamConsumerDagReplayTest extends V5ClientBaseTest {
+
+    /**
+     * Produce N messages into a single-segment topic (everything lands on 
segment 0),
+     * split, then attach a brand-new EARLIEST StreamConsumer. Without the 
controller
+     * including segment 0 in the assignment the consumer would only see 
post-split sends
+     * and the original N messages would be unreachable for this subscription.
+     */
+    @Test
+    public void testEarliestSubscribePostSplitReadsSealedBacklog() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Phase 1: fill segment 0.
+        int preSplit = 50;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < preSplit; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Split: segment 0 is sealed, two new active children take over.
+        long parentSegment = singleActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, parentSegment);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2,
+                "split must produce 2 active children"));
+
+        // Phase 2: more messages, this time landing on the children.
+        int postSplit = 50;
+        for (int i = 0; i < postSplit; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("kk-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // The consumer subscribes for the first time — AFTER both batches and 
the split.
+        // The controller has to include segment 0 (sealed, with backlog) in 
the initial
+        // assignment for this consumer to see the pre-split messages at all.
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dag-replay-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Ack cumulatively as we go — the broker uses subscription backlog == 
0 to decide
+        // a sealed segment is drained, so deferring acks until the end would 
keep the
+        // pre-split parent "in backlog" forever and the active children would 
never get
+        // unblocked.
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < sent.size() && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                consumer.acknowledgeCumulative(msg.id());
+            }
+        }
+        assertEquals(received, sent,
+                "fresh EARLIEST consumer must receive both pre-split (sealed) "
+                        + "and post-split (active) messages");
+    }
+
+    /**
+     * Same shape as the previous test but with a chain of two splits. 
Sealed-on-sealed
+     * (segment 0 → segments 1+2 → segments 3+4 from re-splitting segment 1) 
must all
+     * remain visible to a fresh EARLIEST consumer. Exercises the
+     * {@code getAllSegments()} path the assignment now uses, not just direct 
active
+     * descendants.
+     */
+    @Test
+    public void testEarliestSubscribeAfterTwoSplitsReadsAllBacklog() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        Set<String> sent = new HashSet<>();
+        // Phase 1: lands on segment 0.
+        for (int i = 0; i < 30; i++) {
+            String v = "p1-" + i;
+            producer.newMessage().key("a-" + i).value(v).send();
+            sent.add(v);
+        }
+        admin.scalableTopics().splitSegment(topic, 
singleActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        // Phase 2: lands on the two children.
+        for (int i = 0; i < 30; i++) {
+            String v = "p2-" + i;
+            producer.newMessage().key("b-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Pick one of the two active segments and split it again — produces a 
deeper DAG.
+        long childToSplit = anyActiveSegmentId(topic);
+        admin.scalableTopics().splitSegment(topic, childToSplit);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 3));
+
+        // Phase 3: lands on the now-three active leaves.
+        for (int i = 0; i < 30; i++) {
+            String v = "p3-" + i;
+            producer.newMessage().key("c-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dag-replay-deep-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Ack cumulatively as we go — the broker uses subscription backlog == 
0 to decide
+        // a sealed segment is drained, so deferring acks until the end would 
keep the
+        // pre-split parent "in backlog" forever and the active children would 
never get
+        // unblocked.
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < sent.size() && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                consumer.acknowledgeCumulative(msg.id());
+            }
+        }
+        assertEquals(received, sent,
+                "fresh EARLIEST consumer must receive every message produced 
across the DAG, "
+                        + "including the two sealed generations");
+    }
+
+    /**
+     * After a fresh EARLIEST consumer drains the sealed backlog, a second 
consumer joining
+     * the same subscription must <em>not</em> re-receive the 
already-acknowledged sealed
+     * messages. The assignment still hands the second consumer some sealed 
segments
+     * (rebalance is by hash range, ties broken by id), but the cursor on 
those segments
+     * is at the end — the v4 layer fires {@code TopicTerminated} immediately 
and the
+     * receive loop closes.
+     */
+    @Test
+    public void testSealedBacklogNotRedeliveredAfterFirstConsumerDrains() 
throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Pre-split batch + split.
+        int preSplit = 30;
+        for (int i = 0; i < preSplit; i++) {
+            producer.newMessage().key("k-" + i).value("pre-" + i).send();
+        }
+        admin.scalableTopics().splitSegment(topic, 
singleActiveSegmentId(topic));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeSegmentCount(topic), 2));
+
+        // First consumer drains everything (incl. the sealed segment) and 
acks.
+        StreamConsumer<String> first = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("redelivery-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> firstReceived = new HashSet<>();
+        MessageId last = null;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (firstReceived.size() < preSplit && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = first.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                firstReceived.add(msg.value());
+                last = msg.id();
+            }
+        }
+        assertEquals(firstReceived.size(), preSplit, "first consumer must 
drain all pre-split messages");
+        assertNotNull(last);
+        first.acknowledgeCumulative(last);
+        // Give the broker a moment to persist the cumulative ack on every 
segment.
+        Thread.sleep(500);
+        first.close();
+
+        // Second consumer joins the same subscription. The sealed segment is 
in the
+        // assignment, but its cursor is past every message — no redelivery.
+        @Cleanup
+        StreamConsumer<String> second = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("redelivery-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        Message<String> stray = second.receive(Duration.ofSeconds(2));
+        assertTrue(stray == null,
+                "after the first consumer fully acked the sealed backlog, a 
fresh "
+                        + "subscriber must NOT see those messages, got: "
+                        + (stray == null ? "null" : stray.value()));
+    }
+
+    // --- Helpers ---
+
+    private long singleActiveSegmentId(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                return seg.getSegmentId();
+            }
+        }
+        throw new AssertionError("no active segment for " + topic);
+    }
+
+    private long anyActiveSegmentId(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                return seg.getSegmentId();
+            }
+        }
+        throw new AssertionError("no active segment for " + topic);
+    }
+
+    private int activeSegmentCount(String topic) throws Exception {
+        int active = 0;
+        for (var seg : 
admin.scalableTopics().getMetadata(topic).getSegments().values()) {
+            if (seg.isActive()) {
+                active++;
+            }
+        }
+        return active;
+    }
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index d04da28e35f..dd306c5805e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -280,4 +280,22 @@ public interface ScalableTopics {
      * Delete a segment topic asynchronously.
      */
     CompletableFuture<Void> deleteSegmentAsync(String segmentTopic, boolean 
force);
+
+    /**
+     * Returns the number of unconsumed entries in the given subscription's 
cursor on the
+     * segment topic — i.e. the per-subscription backlog. The call routes to 
the broker
+     * that owns the segment topic, so it works whether the caller is 
colocated with the
+     * segment or not.
+     *
+     * <p>Used internally by the {@link 
org.apache.pulsar.broker.service.scalable.SubscriptionCoordinator
+     * SubscriptionCoordinator} to detect when a sealed parent has been 
drained and its
+     * children can be unblocked. Callers can also use it for diagnostics; a 
returned
+     * {@code 0} on a sealed segment indicates the subscription has nothing 
left to
+     * consume there.
+     *
+     * @param segmentTopic Full segment topic name ({@code 
segment://tenant/namespace/topic/descriptor})
+     * @param subscription Subscription name
+     */
+    CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String 
segmentTopic,
+                                                                String 
subscription);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 950ae667dca..779579f1d1b 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -263,6 +263,17 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String 
segmentTopic,
+                                                                      String 
subscription) {
+        TopicName tn = TopicName.get(segmentTopic);
+        WebTarget path = adminSegments
+                .path(tn.getTenant()).path(tn.getNamespacePortion())
+                .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+                .path("subscription").path(subscription).path("backlog");
+        return asyncGetRequest(path, Long.class);
+    }
+
     // --- Helpers ---
 
     private static TopicName validateTopic(String topic) {

Reply via email to