This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35aa105ec31 [feat][broker] PIP-468: scalable-topic seek + 
clear-backlog admin API (#25696)
35aa105ec31 is described below

commit 35aa105ec31c6f25432deaa8c330d28217689c8a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 18:15:43 2026 -0700

    [feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API 
(#25696)
---
 .../pulsar/broker/admin/v2/ScalableTopics.java     |  86 ++++++++
 .../apache/pulsar/broker/admin/v2/Segments.java    | 130 ++++++++++++
 .../service/scalable/ScalableTopicController.java  | 107 ++++++++++
 .../service/scalable/ScalableTopicService.java     |  21 ++
 .../scalable/ScalableTopicControllerTest.java      | 158 +++++++++++++++
 .../pulsar/client/api/v5/V5AsyncApisTest.java      |  24 +--
 .../api/v5/V5CheckpointConsumerBasicTest.java      |  38 +---
 .../client/api/v5/V5SeekSubscriptionTest.java      | 218 +++++++++++++++++++++
 .../apache/pulsar/client/admin/ScalableTopics.java |  59 ++++++
 .../client/admin/internal/ScalableTopicsImpl.java  |  51 +++++
 .../apache/pulsar/client/api/v5/Checkpoint.java    |  28 +--
 .../pulsar/client/api/v5/CheckpointConsumer.java   |  11 --
 .../client/api/v5/CheckpointConsumerBuilder.java   |   5 +-
 .../client/api/v5/StreamConsumerBuilder.java       |  20 --
 .../api/v5/async/AsyncCheckpointConsumer.java      |   9 -
 .../api/v5/internal/PulsarClientProvider.java      |   3 -
 .../org/apache/pulsar/client/api/v5/Examples.java  |  20 --
 .../apache/pulsar/admin/cli/CmdScalableTopics.java |  45 +++++
 .../client/impl/v5/AsyncCheckpointConsumerV5.java  |   5 -
 .../apache/pulsar/client/impl/v5/CheckpointV5.java |  58 +-----
 .../client/impl/v5/PulsarClientProviderV5.java     |   6 -
 .../client/impl/v5/ScalableCheckpointConsumer.java |  51 +----
 .../client/impl/v5/StreamConsumerBuilderV5.java    |  14 --
 .../pulsar/client/impl/v5/CheckpointV5Test.java    |  31 +--
 24 files changed, 901 insertions(+), 297 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 01b2320f99a..32468c48892 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -404,6 +404,92 @@ public class ScalableTopics extends AdminResource {
                 });
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/seek")
+    @ApiOperation(value = "Reset a subscription's cursor on every segment to 
the given"
+            + " wall-clock timestamp. The controller uses each segment's 
recorded sealed-time"
+            + " window to dispatch the cheapest per-segment op.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Cursor reset successfully on 
all segments"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission on 
the namespace"),
+            @ApiResponse(code = 404, message = "Scalable topic or subscription 
doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void seekSubscription(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Wall-clock millis since the unix epoch", 
required = true)
+            @QueryParam("timestamp") long timestampMs) {
+        validateNamespaceName(tenant, namespace);
+        TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
+
+        validateTopicOperationAsync(tn, TopicOperation.RESET_CURSOR, 
subscription)
+                .thenCompose(__ -> onControllerLeader(tn,
+                        svc -> svc.seekSubscription(tn, subscription, 
timestampMs)))
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", clientAppId())
+                            .attr("subscription", subscription).attr("topic", 
tn)
+                            .attr("timestampMs", timestampMs)
+                            .log("Sought subscription on scalable topic");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", clientAppId())
+                            .attr("subscription", subscription).attr("topic", 
tn)
+                            .exception(ex).log("Failed to seek subscription");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    
@Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/skip-all")
+    @ApiOperation(value = "Skip every undelivered message on the subscription, 
across every"
+            + " segment in the DAG (advance each per-segment cursor to the 
end).")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Backlog cleared successfully 
on all segments"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission on 
the namespace"),
+            @ApiResponse(code = 404, message = "Scalable topic or subscription 
doesn't exist"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void clearBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription) {
+        validateNamespaceName(tenant, namespace);
+        TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName, 
encodedTopic);
+
+        validateTopicOperationAsync(tn, TopicOperation.SKIP, subscription)
+                .thenCompose(__ -> onControllerLeader(tn,
+                        svc -> svc.clearBacklog(tn, subscription)))
+                .thenAccept(__ -> {
+                    log.info().attr("clientAppId", clientAppId())
+                            .attr("subscription", subscription).attr("topic", 
tn)
+                            .log("Cleared backlog on scalable topic");
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", clientAppId())
+                            .attr("subscription", subscription).attr("topic", 
tn)
+                            .exception(ex).log("Failed to clear subscription 
backlog");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     // --- Segment operations ---
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 0ae8b3264a5..507119518b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -228,6 +228,136 @@ public class Segments extends AdminResource {
                 });
     }
 
+    @POST
+    
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/seek")
+    @ApiOperation(value = "Reset the segment topic's subscription cursor to 
the given timestamp."
+            + " Super-user only.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Cursor reset successfully"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 404, message = "Segment topic or subscription 
not found"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void seekSubscription(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify the parent topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", 
required = true)
+            @PathParam("descriptor") String descriptor,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Wall-clock millis since the unix epoch", 
required = true)
+            @QueryParam("timestamp") long timestampMs,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateNamespaceName(tenant, namespace);
+        TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
+
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
+                .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+                .thenCompose(optTopic -> {
+                    if (optTopic.isEmpty()) {
+                        // Segment topic not loaded on this owner — could be 
ownership
+                        // churn or a transient unload. 503 so the caller 
retries (and so
+                        // the parent-topic seek can't conflate this with the
+                        // subscription-not-found case below, which is 
tolerated).
+                        throw new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
+                                "Segment topic not loaded: " + segmentTopic);
+                    }
+                    var sub = optTopic.get().getSubscription(subscription);
+                    if (sub == null) {
+                        throw new RestException(Response.Status.NOT_FOUND,
+                                "Subscription not found on segment: " + 
subscription);
+                    }
+                    return sub.resetCursor(timestampMs)
+                            .exceptionally(ex -> {
+                                Throwable cause = ex instanceof 
java.util.concurrent.CompletionException
+                                        ? ex.getCause() : ex;
+                                if (cause instanceof 
org.apache.pulsar.broker.service.BrokerServiceException
+                                        .SubscriptionInvalidCursorPosition) {
+                                    // Empty managed ledger — no entries to 
seek to. The
+                                    // cursor is already at the only valid 
position, so this
+                                    // is a no-op (e.g. a freshly-split active 
child segment
+                                    // that hasn't received any messages yet).
+                                    log.debug().attr("segment", segmentTopic)
+                                            .attr("subscription", subscription)
+                                            .log("Empty segment, treating seek 
as no-op");
+                                    return null;
+                                }
+                                throw org.apache.pulsar.common.util.FutureUtil
+                                        .wrapToCompletionException(cause);
+                            });
+                })
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", 
subscription).attr("timestampMs", timestampMs)
+                            .exception(ex).log("Failed to seek segment 
subscription");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/skip-all")
+    @ApiOperation(value = "Skip every undelivered message on the segment 
topic's subscription —"
+            + " advance the cursor to the end. Super-user only.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Backlog cleared successfully"),
+            @ApiResponse(code = 401, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
+            @ApiResponse(code = 404, message = "Segment topic or subscription 
not found"),
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void clearSubscriptionBacklog(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify the parent topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", 
required = true)
+            @PathParam("descriptor") String descriptor,
+            @ApiParam(value = "Subscription name", required = true)
+            @PathParam("subscription") String subscription,
+            @ApiParam(value = "Whether leader broker redirected this call to 
this broker.")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
+        validateNamespaceName(tenant, namespace);
+        TopicName segmentTopic = segmentTopicName(tenant, namespace, 
encodedTopic, descriptor);
+
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, 
authoritative))
+                .thenCompose(__ -> 
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+                .thenCompose(optTopic -> {
+                    if (optTopic.isEmpty()) {
+                        // 503 vs 404 — see the rationale on the seek endpoint 
above. The
+                        // distinction lets the parent-topic clear-backlog 
tolerate
+                        // subscription-not-found while still surfacing 
transient unloads.
+                        throw new 
RestException(Response.Status.SERVICE_UNAVAILABLE,
+                                "Segment topic not loaded: " + segmentTopic);
+                    }
+                    var sub = optTopic.get().getSubscription(subscription);
+                    if (sub == null) {
+                        throw new RestException(Response.Status.NOT_FOUND,
+                                "Subscription not found on segment: " + 
subscription);
+                    }
+                    return sub.clearBacklog();
+                })
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error().attr("clientAppId", 
clientAppId()).attr("segment", segmentTopic)
+                            .attr("subscription", subscription)
+                            .exception(ex).log("Failed to clear segment 
subscription backlog");
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @DELETE
     @Path("/{tenant}/{namespace}/{topic}/{descriptor}")
     @ApiOperation(value = "Delete a segment topic. Super-user only.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 552ca321d84..3bd562a48e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -603,6 +603,113 @@ public class ScalableTopicController {
         return CompletableFuture.allOf(futures);
     }
 
+    /**
+     * Reset a subscription's cursor across every segment to the given 
wall-clock
+     * timestamp. We use each segment's recorded {@code [createdAtMs, 
sealedAtMs)}
+     * window to dispatch the cheapest possible per-segment op:
+     *
+     * <ul>
+     *   <li>Segment was sealed before {@code timestampMs} — all of its data 
is from
+     *       earlier; cursor → end of segment (skip-all).</li>
+     *   <li>Segment was created at-or-after {@code timestampMs} — all of its 
data is
+     *       from at-or-after; cursor → earliest (seek to {@code 
timestamp=0}).</li>
+     *   <li>Segment is alive at {@code timestampMs} (active or straddling 
sealed) —
+     *       cursor seeks to {@code timestamp}.</li>
+     * </ul>
+     *
+     * <p>Per-segment failures are surfaced (the call fails-fast). The only 
tolerated
+     * outcome is {@code 404 Not Found} from the segment endpoint, which the 
segment
+     * REST resource emits exclusively for "subscription not present on this 
segment"
+     * (e.g. the cursor hasn't been materialised yet — it will propagate 
lazily and
+     * the next seek will land it). Transient unloads / ownership churn 
surface as
+     * {@code 503} from the segment endpoint and propagate to the caller, who 
can
+     * retry the parent-level operation.
+     */
+    public CompletableFuture<Void> seekSubscription(String subscription, long 
timestampMs) {
+        checkLeader();
+        SegmentLayout layout = this.currentLayout;
+        CompletableFuture<?>[] futures = 
layout.getAllSegments().values().stream()
+                .map(segment -> seekSubscriptionOnSegment(segment, 
subscription, timestampMs))
+                .toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(futures);
+    }
+
+    /**
+     * Skip every undelivered message on the subscription, across every 
segment in the
+     * DAG. Equivalent to advancing each per-segment cursor to the end.
+     */
+    public CompletableFuture<Void> clearBacklog(String subscription) {
+        checkLeader();
+        SegmentLayout layout = this.currentLayout;
+        CompletableFuture<?>[] futures = 
layout.getAllSegments().values().stream()
+                .map(segment -> clearSubscriptionBacklogOnSegment(segment, 
subscription))
+                .toArray(CompletableFuture[]::new);
+        return CompletableFuture.allOf(futures);
+    }
+
+    private CompletableFuture<Void> seekSubscriptionOnSegment(SegmentInfo 
segment,
+                                                              String 
subscription,
+                                                              long 
timestampMs) {
+        // Classify the segment relative to the requested timestamp using the 
recorded
+        // sealed-time / created-time. This is what makes the parent-level 
seek O(N segments)
+        // worth of cheap RPCs rather than O(N) timestamp-based scans of every 
segment's
+        // managed ledger.
+        if (segment.isSealed() && segment.sealedAtMs() > 0
+                && segment.sealedAtMs() <= timestampMs) {
+            // Segment fully predates timestamp → skip everything on this 
segment.
+            return clearSubscriptionBacklogOnSegment(segment, subscription);
+        }
+        long effective = timestampMs;
+        if (segment.createdAtMs() > 0 && segment.createdAtMs() >= timestampMs) 
{
+            // Segment fully postdates timestamp → seek to start (timestamp=0 
== earliest
+            // for managed-ledger reset-cursor-by-timestamp semantics).
+            effective = 0L;
+        }
+        String segmentName = toSegmentPersistentName(segment);
+        try {
+            return brokerService.getPulsar().getAdminClient()
+                    
.scalableTopics().seekSegmentSubscriptionAsync(segmentName, subscription, 
effective)
+                    .exceptionally(ex -> {
+                        Throwable cause =
+                                
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException
+                                .NotFoundException) {
+                            // 404 from the segment endpoint == "subscription 
not present
+                            // on this segment" (the segment endpoint uses 503 
for
+                            // "topic not loaded"). The cursor will propagate 
lazily;
+                            // tolerated.
+                            return null;
+                        }
+                        throw 
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+                    });
+        } catch (PulsarServerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Void> 
clearSubscriptionBacklogOnSegment(SegmentInfo segment,
+                                                                       String 
subscription) {
+        String segmentName = toSegmentPersistentName(segment);
+        try {
+            return brokerService.getPulsar().getAdminClient()
+                    .scalableTopics()
+                    .clearSegmentSubscriptionBacklogAsync(segmentName, 
subscription)
+                    .exceptionally(ex -> {
+                        Throwable cause =
+                                
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException
+                                .NotFoundException) {
+                            // Subscription not present on this segment — 
tolerated.
+                            // (See seek path for the 404-vs-503 contract.)
+                            return null;
+                        }
+                        throw 
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+                    });
+        } catch (PulsarServerException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
     private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo 
segment, String subscription) {
         String persistentName = toSegmentUnderlyingPersistentName(segment);
         try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index e43ca06ccbd..12c7064e844 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -196,6 +196,27 @@ public class ScalableTopicService {
                 .thenCompose(controller -> 
controller.deleteSubscription(subscription));
     }
 
+    /**
+     * Reset a subscription's cursor across every segment to the given 
wall-clock
+     * timestamp. Delegates to the controller leader; the controller uses each
+     * segment's recorded {@code [createdAtMs, sealedAtMs)} window to dispatch 
the
+     * cheapest per-segment op (skip-all, seek-by-timestamp, or 
seek-to-earliest).
+     */
+    public CompletableFuture<Void> seekSubscription(TopicName topic, String 
subscription,
+                                                     long timestampMs) {
+        return getOrCreateController(topic)
+                .thenCompose(controller -> 
controller.seekSubscription(subscription, timestampMs));
+    }
+
+    /**
+     * Skip every undelivered message on the subscription, across every 
segment in
+     * the DAG. Delegates to the controller leader.
+     */
+    public CompletableFuture<Void> clearBacklog(TopicName topic, String 
subscription) {
+        return getOrCreateController(topic)
+                .thenCompose(controller -> 
controller.clearBacklog(subscription));
+    }
+
     /**
      * Get aggregated stats for a scalable topic. Read-only: does not require 
leadership.
      * Returns segment-DAG counts and per-subscription consumer counts, read 
from the
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index c00b1923504..8c75823ebef 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.scalable;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -456,6 +457,163 @@ public class ScalableTopicControllerTest {
         }
     }
 
+    // --- Seek subscription / clear backlog ---
+
+    /**
+     * The seek path classifies each segment by its {@code [createdAtMs, 
sealedAtMs)} window
+     * against the requested timestamp. With 3 segments — one sealed entirely 
before the
+     * timestamp, one straddling, and one created entirely after — we expect:
+     * <ul>
+     *   <li>Sealed-and-old segment: skip-all admin call (cursor → end).</li>
+     *   <li>Straddling segment: seek admin call with the requested 
timestamp.</li>
+     *   <li>Created-after segment: seek admin call with timestamp 0 (i.e. 
earliest).</li>
+     * </ul>
+     */
+    @Test
+    public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws 
Exception {
+        // Build a custom metadata with three segments at specific timestamps.
+        long t0 = 1_000_000L;
+        long t1 = 2_000_000L;
+        long t2 = 3_000_000L;
+        long t3 = 4_000_000L;
+        // Segment 0: created at t0, sealed at t1 (entirely before tSeek).
+        // Segment 1: created at t1, still active (straddles tSeek).
+        // Segment 2: created at t3, still active (entirely after tSeek).
+        long tSeek = t2;
+
+        org.apache.pulsar.common.scalable.SegmentInfo seg0 = new 
org.apache.pulsar.common.scalable.SegmentInfo(
+                0L,
+                org.apache.pulsar.common.scalable.HashRange.of(0x0000, 0x3FFF),
+                org.apache.pulsar.common.scalable.SegmentState.SEALED,
+                java.util.List.of(), java.util.List.of(3L),
+                /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1,
+                /*createdAtMs*/ t0, /*sealedAtMs*/ t1);
+        org.apache.pulsar.common.scalable.SegmentInfo seg1 = new 
org.apache.pulsar.common.scalable.SegmentInfo(
+                1L,
+                org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF),
+                org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
+                java.util.List.of(), java.util.List.of(),
+                0, -1, t1, -1);
+        org.apache.pulsar.common.scalable.SegmentInfo seg2 = new 
org.apache.pulsar.common.scalable.SegmentInfo(
+                2L,
+                org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF),
+                org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
+                java.util.List.of(), java.util.List.of(),
+                0, -1, t3, -1);
+
+        TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic");
+        ScalableTopicMetadata md = ScalableTopicMetadata.builder()
+                .epoch(2).nextSegmentId(4)
+                .segments(java.util.Map.of(0L, seg0, 1L, seg1, 2L, seg2))
+                .properties(java.util.Map.of())
+                .build();
+        resources.createScalableTopicAsync(seekTopic, md).get();
+        ScalableTopicController c = new ScalableTopicController(
+                seekTopic, resources, brokerService, coordinationService);
+        try {
+            // Stub the segment-aware admin calls.
+            when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), 
anyString(), anyLong()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+            
when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), 
anyString()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+
+            c.initialize().get();
+            c.seekSubscription("sub-a", tSeek).get();
+
+            // Sealed-old segment: skip-all admin call once.
+            verify(scalableTopics, org.mockito.Mockito.times(1))
+                    .clearSegmentSubscriptionBacklogAsync(anyString(), 
org.mockito.ArgumentMatchers.eq("sub-a"));
+
+            // Two seek calls: one for the straddling segment (t == tSeek), 
one for the
+            // created-after segment (t == 0 because seg2.createdAtMs >= 
tSeek).
+            org.mockito.ArgumentCaptor<Long> tsCaptor = 
org.mockito.ArgumentCaptor.forClass(Long.class);
+            verify(scalableTopics, org.mockito.Mockito.times(2))
+                    .seekSegmentSubscriptionAsync(anyString(),
+                            org.mockito.ArgumentMatchers.eq("sub-a"),
+                            tsCaptor.capture());
+            java.util.List<Long> sentTs = tsCaptor.getAllValues();
+            assertTrue(sentTs.contains(tSeek), "expected straddling segment to 
receive tSeek");
+            assertTrue(sentTs.contains(0L), "expected created-after segment to 
receive 0");
+        } finally {
+            c.close().join();
+        }
+    }
+
+    /** Clear-backlog dispatches skip-all to every segment in the DAG. */
+    @Test
+    public void testClearBacklogDispatchesSkipAllToEverySegment() throws 
Exception {
+        when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), 
anyString()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        controller.initialize().get();
+
+        controller.clearBacklog("sub-a").get();
+        // INITIAL_SEGMENTS active segments, no sealed ones in the baseline → 
exactly N calls.
+        verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
+                .clearSegmentSubscriptionBacklogAsync(anyString(),
+                        org.mockito.ArgumentMatchers.eq("sub-a"));
+    }
+
+    /**
+     * 404 from a per-segment seek means "subscription not present on that 
segment" —
+     * the controller tolerates this as success (cursor will materialise 
lazily).
+     */
+    @Test
+    public void testSeekTolerates404SubscriptionNotFound() throws Exception {
+        when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), 
anyString(), anyLong()))
+                .thenReturn(CompletableFuture.failedFuture(
+                        new 
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException(
+                                new RuntimeException("Subscription not found"),
+                                "Subscription not found", 404)));
+        controller.initialize().get();
+
+        // Should not throw — every segment's 404 is swallowed.
+        controller.seekSubscription("sub-a", System.currentTimeMillis()).get();
+    }
+
+    /**
+     * 503 from the segment endpoint == "topic not loaded yet". This is a 
transient
+     * unload (e.g. ownership churn); the controller MUST surface it so the 
caller can
+     * retry the parent-level operation, instead of silently skipping segments.
+     */
+    @Test
+    public void testSeekPropagates503TransientUnload() throws Exception {
+        org.apache.pulsar.client.admin.PulsarAdminException unavailable =
+                new org.apache.pulsar.client.admin.PulsarAdminException(
+                        new RuntimeException("Service Unavailable"),
+                        "Segment topic not loaded", 503);
+        when(scalableTopics.seekSegmentSubscriptionAsync(anyString(), 
anyString(), anyLong()))
+                .thenReturn(CompletableFuture.failedFuture(unavailable));
+        controller.initialize().get();
+
+        java.util.concurrent.ExecutionException ex =
+                
org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class,
+                        () -> controller.seekSubscription("sub-a", 
System.currentTimeMillis()).get());
+        Throwable cause = ex.getCause() instanceof 
java.util.concurrent.CompletionException
+                ? ex.getCause().getCause() : ex.getCause();
+        assertTrue(cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException,
+                "expected 503 PulsarAdminException to propagate, got " + 
cause);
+    }
+
+    /** Same contract as seek: 503 from a per-segment clear-backlog must 
propagate. */
+    @Test
+    public void testClearBacklogPropagates503TransientUnload() throws 
Exception {
+        org.apache.pulsar.client.admin.PulsarAdminException unavailable =
+                new org.apache.pulsar.client.admin.PulsarAdminException(
+                        new RuntimeException("Service Unavailable"),
+                        "Segment topic not loaded", 503);
+        when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(), 
anyString()))
+                .thenReturn(CompletableFuture.failedFuture(unavailable));
+        controller.initialize().get();
+
+        java.util.concurrent.ExecutionException ex =
+                
org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class,
+                        () -> controller.clearBacklog("sub-a").get());
+        Throwable cause = ex.getCause() instanceof 
java.util.concurrent.CompletionException
+                ? ex.getCause().getCause() : ex.getCause();
+        assertTrue(cause instanceof 
org.apache.pulsar.client.admin.PulsarAdminException,
+                "expected 503 PulsarAdminException to propagate, got " + 
cause);
+    }
+
     // --- Sealed-segment GC ---
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
index 54164b23339..28ba3bc1a7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
@@ -24,9 +24,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -176,14 +174,16 @@ public class V5AsyncApisTest extends V5ClientBaseTest {
     }
 
     @Test
-    public void testAsyncCheckpointConsumerCheckpointAndSeek() throws 
Exception {
+    public void testAsyncCheckpointConsumerCheckpoint() throws Exception {
+        // Verifies that the async view of CheckpointConsumer surfaces the same
+        // checkpoint as the sync API and that it completes asynchronously.
         String topic = newScalableTopic(1);
 
         @Cleanup
         Producer<String> producer = v5Client.newProducer(Schema.string())
                 .topic(topic)
                 .create();
-        for (int i = 0; i < 6; i++) {
+        for (int i = 0; i < 3; i++) {
             producer.newMessage().value("v-" + i).send();
         }
 
@@ -194,28 +194,12 @@ public class V5AsyncApisTest extends V5ClientBaseTest {
                 .create();
 
         AsyncCheckpointConsumer<String> async = consumer.async();
-
-        // Read 3, snapshot via async, read 3 more, then async-seek back.
         for (int i = 0; i < 3; i++) {
             Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
             assertEquals(msg.value(), "v-" + i);
         }
         Checkpoint mark = async.checkpoint().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
         assertNotNull(mark, "async checkpoint must complete with a non-null 
position");
-
-        for (int i = 3; i < 6; i++) {
-            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
-            assertEquals(msg.value(), "v-" + i);
-        }
-
-        async.seek(mark).get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
-        Set<String> redelivered = new HashSet<>();
-        for (int i = 0; i < 3; i++) {
-            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
-            redelivered.add(msg.value());
-        }
-        assertEquals(redelivered, Set.of("v-3", "v-4", "v-5"),
-                "async seek did not redeliver the post-checkpoint window");
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
index 0cd3b78c8a4..697491ad74a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
@@ -32,7 +32,7 @@ import org.testng.annotations.Test;
  * Basic end-to-end coverage for {@link CheckpointConsumer}: the unmanaged 
reader-style
  * API used by connector frameworks (Flink, Spark) — specifically the 
start-position
  * sentinels (earliest / latest), checkpoint roundtrip via {@link 
Checkpoint#toByteArray()},
- * resume from a saved checkpoint, and {@link 
CheckpointConsumer#seek(Checkpoint)}.
+ * and resume from a saved checkpoint.
  *
  * <p>All scenarios use a single-segment scalable topic to keep the focus on 
the
  * consumer surface itself; cross-segment position-vector behavior lives in the
@@ -189,42 +189,6 @@ public class V5CheckpointConsumerBasicTest extends 
V5ClientBaseTest {
         assertEquals(received, List.of("v-3", "v-4", "v-5"));
     }
 
-    @Test
-    public void testSeekRewindsToEarlierCheckpoint() throws Exception {
-        String topic = newScalableTopic(1);
-
-        @Cleanup
-        Producer<String> producer = v5Client.newProducer(Schema.string())
-                .topic(topic)
-                .create();
-        for (int i = 0; i < 6; i++) {
-            producer.newMessage().value("v-" + i).send();
-        }
-
-        @Cleanup
-        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
-                .topic(topic)
-                .startPosition(Checkpoint.earliest())
-                .create();
-
-        // Read 3, snapshot, read 3 more, then seek back to the snapshot — 
should
-        // re-deliver the second batch.
-        for (int i = 0; i < 3; i++) {
-            assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" 
+ i);
-        }
-        Checkpoint mark = consumer.checkpoint();
-        for (int i = 3; i < 6; i++) {
-            assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" 
+ i);
-        }
-
-        consumer.seek(mark);
-        for (int i = 3; i < 6; i++) {
-            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
-            assertNotNull(msg, "seek did not redeliver message v-" + i);
-            assertEquals(msg.value(), "v-" + i);
-        }
-    }
-
     @Test
     public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception 
{
         String topic = newScalableTopic(1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
new file mode 100644
index 00000000000..a5e3a3d1952
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the parent-topic admin seek + clear-backlog 
operations on a
+ * scalable-topic subscription. Exercises the real per-segment cursor-reset 
path
+ * (managed-ledger {@code resetCursor(timestamp)} / {@code clearBacklog}), 
which the
+ * controller-level mock-based tests do not cover.
+ */
+public class V5SeekSubscriptionTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSeekRewindsCursorAcrossSingleSegment() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("seek-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Produce 5 messages, consume + ack — drains the cursor so the rewind 
is observable.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumer.acknowledge(msg.id());
+        }
+
+        // Snapshot the rewind target — anything published before this should 
be redelivered.
+        long mark = System.currentTimeMillis();
+        // Slack for the broker's wall-clock vs the test's: producing 
immediately at `mark`
+        // can land at `mark` or `mark+1`. Sleep a tick so post-mark messages 
have a strictly
+        // later publish time.
+        Thread.sleep(10);
+
+        for (int i = 5; i < 10; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+        for (int i = 5; i < 10; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumer.acknowledge(msg.id());
+        }
+
+        // Drained — confirm.
+        assertNull(consumer.receive(Duration.ofMillis(500)));
+
+        // Admin seek back to the mark — post-mark messages must be 
redelivered.
+        admin.scalableTopics().seekSubscription(topic, "seek-sub", mark);
+
+        Set<String> redelivered = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "expected redelivery of post-mark message #" + 
i);
+            redelivered.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(redelivered, Set.of("v-5", "v-6", "v-7", "v-8", "v-9"),
+                "seek must redeliver exactly the post-mark window");
+    }
+
+    /**
+     * Regression: a freshly-split active child segment that has received no 
messages
+     * straddles {@code timestampMs} but has an empty managed ledger; the 
per-segment
+     * {@code resetCursor} on it would have failed with {@code 
SubscriptionInvalidCursorPosition},
+     * which used to bring the entire parent-level seek down. The fix treats 
empty
+     * segments as a no-op.
+     */
+    @Test
+    public void testSeekToleratesEmptyChildSegmentsAfterSplit() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("split-seek-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Populate the initial segment.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        // Drain so we don't redeliver them later.
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumer.acknowledge(msg.id());
+        }
+
+        // Split — children are empty; parent is sealed with all 5 messages.
+        long activeSegmentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeSegmentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(activeSegmentId >= 0);
+        admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+        // Wait for the split to be visible.
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+
+        // Seek to "now" — this exercises the bug. The two empty active 
children
+        // straddle the timestamp; without the fix, the controller's allOf 
would fail
+        // because resetCursor on an empty managed ledger throws
+        // SubscriptionInvalidCursorPosition.
+        long now = System.currentTimeMillis();
+        admin.scalableTopics().seekSubscription(topic, "split-seek-sub", now);
+
+        // After seek, no backlog (the sealed parent's data is all from before 
`now`,
+        // and the children have no data). So receive must time out.
+        assertNull(consumer.receive(Duration.ofSeconds(2)),
+                "seek across empty children must succeed and leave no 
undelivered messages");
+    }
+
+    @Test
+    public void testClearBacklogDropsAllUndeliveredMessages() throws Exception 
{
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+                .create();
+
+        // Establish the subscription via a short-lived consumer — and then 
close it. The
+        // V5 receive-queue would otherwise prefetch the produced messages 
into its
+        // client-side buffer, masking the broker-side cursor advance from 
clearBacklog.
+        // Closing also releases the segment-cursor fences so clearBacklog 
itself can
+        // fence them cleanly.
+        QueueConsumer<String> bootstrap = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("clear-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        bootstrap.close();
+
+        for (int i = 0; i < 20; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+
+        admin.scalableTopics().clearBacklog(topic, "clear-sub");
+
+        // Re-attach the consumer — every cursor is at the end, so no backlog 
remains.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("clear-sub")
+                .subscribe();
+
+        assertNull(consumer.receive(Duration.ofSeconds(2)),
+                "clear-backlog must skip every undelivered message");
+
+        // Subsequent messages still flow through.
+        producer.newMessage().value("after-clear").send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.value(), "after-clear");
+        consumer.acknowledge(msg.id());
+    }
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index dd306c5805e..10da169d14e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -206,6 +206,38 @@ public interface ScalableTopics {
      */
     CompletableFuture<Void> deleteSubscriptionAsync(String topic, String 
subscription);
 
+    /**
+     * Reset a subscription's cursor across every segment to the given 
wall-clock
+     * timestamp. The controller uses each segment's recorded sealed-time 
window to
+     * dispatch the cheapest per-segment op.
+     *
+     * @param topic        Topic name in the format "tenant/namespace/topic"
+     * @param subscription Subscription name
+     * @param timestampMs  Wall-clock millis since the unix epoch
+     */
+    void seekSubscription(String topic, String subscription, long timestampMs)
+            throws PulsarAdminException;
+
+    /**
+     * Reset a subscription's cursor across every segment, asynchronously.
+     */
+    CompletableFuture<Void> seekSubscriptionAsync(String topic, String 
subscription,
+                                                   long timestampMs);
+
+    /**
+     * Skip every undelivered message on the subscription, across every 
segment in the
+     * DAG (advance each per-segment cursor to the end).
+     *
+     * @param topic        Topic name in the format "tenant/namespace/topic"
+     * @param subscription Subscription name
+     */
+    void clearBacklog(String topic, String subscription) throws 
PulsarAdminException;
+
+    /**
+     * Skip every undelivered message on the subscription, asynchronously.
+     */
+    CompletableFuture<Void> clearBacklogAsync(String topic, String 
subscription);
+
     /**
      * Split a segment into two halves.
      *
@@ -298,4 +330,31 @@ public interface ScalableTopics {
      */
     CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String 
segmentTopic,
                                                                 String 
subscription);
+
+    /**
+     * Reset the segment topic's subscription cursor to the given wall-clock 
timestamp.
+     * Routes to the broker that owns the segment topic.
+     *
+     * <p>Used internally by the parent-topic seek operation in
+     * {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController
+     * ScalableTopicController}: the controller classifies each segment by its
+     * {@code [createdAtMs, sealedAtMs)} window against the requested 
timestamp and
+     * dispatches per-segment seek / skip-all calls.
+     *
+     * @param segmentTopic Full segment topic name ({@code 
segment://tenant/namespace/topic/descriptor})
+     * @param subscription Subscription name
+     * @param timestampMs Wall-clock millis since the unix epoch
+     */
+    CompletableFuture<Void> seekSegmentSubscriptionAsync(String segmentTopic, 
String subscription,
+                                                          long timestampMs);
+
+    /**
+     * Skip every undelivered message on the segment topic's subscription — 
advance the
+     * cursor to the end of the segment.
+     *
+     * @param segmentTopic Full segment topic name ({@code 
segment://tenant/namespace/topic/descriptor})
+     * @param subscription Subscription name
+     */
+    CompletableFuture<Void> clearSegmentSubscriptionBacklogAsync(String 
segmentTopic,
+                                                                  String 
subscription);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 779579f1d1b..5a040123d2e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -184,6 +184,33 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public void seekSubscription(String topic, String subscription, long 
timestampMs)
+            throws PulsarAdminException {
+        sync(() -> seekSubscriptionAsync(topic, subscription, timestampMs));
+    }
+
+    @Override
+    public CompletableFuture<Void> seekSubscriptionAsync(String topic, String 
subscription,
+                                                          long timestampMs) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = 
topicPath(tn).path("subscriptions").path(subscription).path("seek")
+                .queryParam("timestamp", timestampMs);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void clearBacklog(String topic, String subscription) throws 
PulsarAdminException {
+        sync(() -> clearBacklogAsync(topic, subscription));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearBacklogAsync(String topic, String 
subscription) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = 
topicPath(tn).path("subscriptions").path(subscription).path("skip-all");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
     // --- Split ---
 
     @Override
@@ -274,6 +301,30 @@ public class ScalableTopicsImpl extends BaseResource 
implements ScalableTopics {
         return asyncGetRequest(path, Long.class);
     }
 
+    @Override
+    public CompletableFuture<Void> seekSegmentSubscriptionAsync(String 
segmentTopic,
+                                                                 String 
subscription,
+                                                                 long 
timestampMs) {
+        TopicName tn = TopicName.get(segmentTopic);
+        WebTarget path = adminSegments
+                .path(tn.getTenant()).path(tn.getNamespacePortion())
+                .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+                .path("subscription").path(subscription).path("seek")
+                .queryParam("timestamp", timestampMs);
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public CompletableFuture<Void> clearSegmentSubscriptionBacklogAsync(String 
segmentTopic,
+                                                                        String 
subscription) {
+        TopicName tn = TopicName.get(segmentTopic);
+        WebTarget path = adminSegments
+                .path(tn.getTenant()).path(tn.getNamespacePortion())
+                .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+                .path("subscription").path(subscription).path("skip-all");
+        return asyncPostRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
     // --- Helpers ---
 
     private static TopicName validateTopic(String topic) {
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
index 6b24fe8943d..e872bb17158 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.api.v5;
 
 import java.io.IOException;
-import java.time.Instant;
 import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
 
 /**
@@ -30,9 +29,12 @@ import 
org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
  * serialized for external storage (e.g. Flink state, S3) using {@link 
#toByteArray()}.
  *
  * <p>This is the sole position type used with {@link CheckpointConsumer} — 
for initial
- * positioning use the static factories {@link #earliest()}, {@link #latest()},
- * {@link #atTimestamp(Instant)}, or {@link #fromByteArray(byte[])} to restore 
from
- * a previously saved checkpoint.
+ * positioning use the static factories {@link #earliest()}, {@link 
#latest()}, or
+ * {@link #fromByteArray(byte[])} to restore from a previously saved 
checkpoint.
+ *
+ * <p>For timestamp-based positioning, use the {@code scalable-topics seek} 
admin
+ * operation on the subscription instead — see
+ * {@link org.apache.pulsar.client.admin.ScalableTopics#seekSubscription}.
  */
 public interface Checkpoint {
 
@@ -44,13 +46,6 @@ public interface Checkpoint {
      */
     byte[] toByteArray();
 
-    /**
-     * The time at which this checkpoint was created.
-     *
-     * @return the creation timestamp of this checkpoint as an {@link Instant}
-     */
-    Instant creationTime();
-
     // --- Static factories ---
 
     /**
@@ -71,17 +66,6 @@ public interface Checkpoint {
         return PulsarClientProvider.get().latestCheckpoint();
     }
 
-    /**
-     * A checkpoint that positions at the first message published at or after 
the given timestamp.
-     *
-     * @param timestamp the timestamp to position at
-     * @return a {@link Checkpoint} that will start consuming from the first 
message at or after
-     *         the given timestamp
-     */
-    static Checkpoint atTimestamp(Instant timestamp) {
-        return PulsarClientProvider.get().checkpointAtTimestamp(timestamp);
-    }
-
     /**
      * Deserialize a checkpoint from a byte array previously obtained via 
{@link #toByteArray()}.
      *
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
index 0d278fb3eb5..40c4d8b3af2 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
@@ -91,17 +91,6 @@ public interface CheckpointConsumer<T> extends Closeable {
      */
     Checkpoint checkpoint();
 
-    // --- Seek ---
-
-    /**
-     * Seek to a previously saved checkpoint, or to a sentinel position such as
-     * {@link Checkpoint#earliest()} or {@link Checkpoint#latest()}.
-     *
-     * @param checkpoint the checkpoint to seek to
-     * @throws PulsarClientException if the seek fails or a connection error 
occurs
-     */
-    void seek(Checkpoint checkpoint) throws PulsarClientException;
-
     // --- Async ---
 
     /**
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index 1a67dbcfe84..9f917e9ff7b 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -62,9 +62,8 @@ public interface CheckpointConsumerBuilder<T> {
     /**
      * Set the initial position for this consumer.
      *
-     * <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()},
-     * {@link Checkpoint#atTimestamp}, or {@link Checkpoint#fromByteArray} to
-     * create the appropriate starting position.
+     * <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()}, or
+     * {@link Checkpoint#fromByteArray} to create the appropriate starting 
position.
      *
      * <p>Defaults to {@link Checkpoint#latest()} if not specified.
      *
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index ab813842b4f..5f111246471 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.api.v5;
 
 import java.time.Duration;
-import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
@@ -88,25 +87,6 @@ public interface StreamConsumerBuilder<T> {
      */
     StreamConsumerBuilder<T> subscriptionName(String subscriptionName);
 
-    // --- Seek (initial position override) ---
-
-    /**
-     * Reset the subscription to a specific message ID.
-     *
-     * @param messageId the message ID to seek to
-     * @return this builder instance for chaining
-     */
-    StreamConsumerBuilder<T> seek(MessageId messageId);
-
-    /**
-     * Reset the subscription to a specific timestamp. The subscription
-     * will be positioned at the first message published at or after this 
timestamp.
-     *
-     * @param timestamp the timestamp to seek to
-     * @return this builder instance for chaining
-     */
-    StreamConsumerBuilder<T> seek(Instant timestamp);
-
     // --- Optional ---
 
     /**
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
index f7bb04bfaba..b663499e175 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
@@ -69,15 +69,6 @@ public interface AsyncCheckpointConsumer<T> {
      */
     CompletableFuture<Checkpoint> checkpoint();
 
-    /**
-     * Seek to a checkpoint asynchronously.
-     *
-     * @param checkpoint the checkpoint to seek to
-     * @return a {@link CompletableFuture} that completes when the consumer 
has been repositioned
-     *         to the given checkpoint
-     */
-    CompletableFuture<Void> seek(Checkpoint checkpoint);
-
     /**
      * Close this consumer asynchronously.
      *
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index faff56205d6..e27a02302bc 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.api.v5.internal;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.function.Supplier;
@@ -87,8 +86,6 @@ public interface PulsarClientProvider {
 
     Checkpoint latestCheckpoint();
 
-    Checkpoint checkpointAtTimestamp(Instant timestamp);
-
     // --- Authentication ---
 
     Authentication authenticationToken(String token);
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index b3439d31c0a..1218c8ca27c 100644
--- 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -543,26 +543,6 @@ public class Examples {
         }
     }
 
-    /** Time-travel seek — rewind to a specific timestamp. */
-    void checkpointConsumerSeek(PulsarClient client) throws Exception {
-        try (var consumer = client.newCheckpointConsumer(Schema.string())
-                .topic("events")
-                .startPosition(Checkpoint.latest())
-                .create()) {
-
-            // Seek back to replay from a specific time
-            
consumer.seek(Checkpoint.atTimestamp(Instant.parse("2025-12-01T00:00:00Z")));
-
-            while (true) {
-                Message<String> msg = consumer.receive(Duration.ofSeconds(5));
-                if (msg == null) {
-                    break;
-                }
-                System.out.printf("[%s] %s%n", msg.publishTime(), msg.value());
-            }
-        }
-    }
-
     // 
==================================================================================
     // Helper types for the examples
     // 
==================================================================================
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index c4a308cbdce..0391653198f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -137,6 +137,49 @@ public class CmdScalableTopics extends CmdBase {
         }
     }
 
+    @Command(description = "Reset a subscription's cursor on every segment to 
a given"
+            + " point in time. Pass --time as a relative offset (e.g. 1h, 5d) 
— the cursor"
+            + " is reset to (now - offset).")
+    private class SeekSubscriptionCmd extends CliCommand {
+        @Parameters(description = "tenant/namespace/topic", arity = "1")
+        private String topic;
+
+        @Option(names = {"-s", "--subscription"},
+                description = "Subscription name", required = true)
+        private String subscription;
+
+        @Option(names = {"-t", "--time"},
+                description = "Relative offset in the past to seek to (e.g. 
1h, 5d, 30m)",
+                required = true,
+                converter = 
org.apache.pulsar.cli.converters.picocli.TimeUnitToMillisConverter.class)
+        private Long offsetMillis;
+
+        @Override
+        void run() throws Exception {
+            long target = System.currentTimeMillis() - offsetMillis;
+            scalableTopics().seekSubscription(topic, subscription, target);
+            print("Reset subscription " + subscription + " on topic " + topic
+                    + " to timestamp " + target + " (" + offsetMillis + "ms 
ago)");
+        }
+    }
+
+    @Command(description = "Skip every undelivered message on the 
subscription, across every"
+            + " segment in the DAG.")
+    private class ClearBacklogCmd extends CliCommand {
+        @Parameters(description = "tenant/namespace/topic", arity = "1")
+        private String topic;
+
+        @Option(names = {"-s", "--subscription"},
+                description = "Subscription name", required = true)
+        private String subscription;
+
+        @Override
+        void run() throws Exception {
+            scalableTopics().clearBacklog(topic, subscription);
+            print("Cleared backlog of subscription " + subscription + " on 
topic " + topic);
+        }
+    }
+
     @Command(description = "Merge two adjacent segments into one")
     private class MergeSegmentsCmd extends CliCommand {
         @Parameters(description = "tenant/namespace/topic", arity = "1")
@@ -166,5 +209,7 @@ public class CmdScalableTopics extends CmdBase {
         addCommand("delete", new DeleteCmd());
         addCommand("split-segment", new SplitSegmentCmd());
         addCommand("merge-segments", new MergeSegmentsCmd());
+        addCommand("seek", new SeekSubscriptionCmd());
+        addCommand("clear-backlog", new ClearBacklogCmd());
     }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
index c4a7bf8d380..ace455e7ba9 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
@@ -65,11 +65,6 @@ final class AsyncCheckpointConsumerV5<T> implements 
AsyncCheckpointConsumer<T> {
         return consumer.checkpointAsync();
     }
 
-    @Override
-    public CompletableFuture<Void> seek(Checkpoint checkpoint) {
-        return consumer.seekAsync(checkpoint);
-    }
-
     @Override
     public CompletableFuture<Void> close() {
         return consumer.closeAsync();
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
index 566209a2b32..b9ae8c0a777 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl.v5;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -35,17 +34,14 @@ final class CheckpointV5 implements Checkpoint {
     private static final byte TYPE_REGULAR = 0;
     private static final byte TYPE_EARLIEST = 1;
     private static final byte TYPE_LATEST = 2;
-    private static final byte TYPE_TIMESTAMP = 3;
 
-    static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST, 
Instant.EPOCH);
-    static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST, 
Instant.EPOCH);
+    static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST);
+    static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST);
 
     private final Map<Long, org.apache.pulsar.client.api.MessageId> 
segmentPositions;
-    private final Instant creationTime;
 
-    CheckpointV5(Map<Long, org.apache.pulsar.client.api.MessageId> 
segmentPositions, Instant creationTime) {
+    CheckpointV5(Map<Long, org.apache.pulsar.client.api.MessageId> 
segmentPositions) {
         this.segmentPositions = Map.copyOf(segmentPositions);
-        this.creationTime = creationTime;
     }
 
     /**
@@ -55,16 +51,11 @@ final class CheckpointV5 implements Checkpoint {
         return segmentPositions;
     }
 
-    @Override
-    public Instant creationTime() {
-        return creationTime;
-    }
-
     @Override
     public byte[] toByteArray() {
-        // Format: [1 byte type] [8 bytes creationTimeMillis] [4 bytes 
numEntries]
+        // Format: [1 byte type] [4 bytes numEntries]
         //         [for each entry: [8 bytes segmentId] [4 bytes msgIdLen] 
[msgIdBytes]]
-        int totalSize = 1 + 8 + 4;
+        int totalSize = 1 + 4;
         Map<Long, byte[]> serializedIds = new HashMap<>();
         for (var entry : segmentPositions.entrySet()) {
             byte[] idBytes = entry.getValue().toByteArray();
@@ -74,7 +65,6 @@ final class CheckpointV5 implements Checkpoint {
 
         ByteBuffer buf = ByteBuffer.allocate(totalSize);
         buf.put(TYPE_REGULAR);
-        buf.putLong(creationTime.toEpochMilli());
         buf.putInt(segmentPositions.size());
         for (var entry : serializedIds.entrySet()) {
             buf.putLong(entry.getKey());
@@ -95,12 +85,7 @@ final class CheckpointV5 implements Checkpoint {
         return switch (type) {
             case TYPE_EARLIEST -> EARLIEST;
             case TYPE_LATEST -> LATEST;
-            case TYPE_TIMESTAMP -> {
-                long millis = buf.getLong();
-                yield new TimestampCheckpoint(Instant.ofEpochMilli(millis));
-            }
             case TYPE_REGULAR -> {
-                long creationMillis = buf.getLong();
                 int numEntries = buf.getInt();
                 Map<Long, org.apache.pulsar.client.api.MessageId> positions = 
new HashMap<>();
                 for (int i = 0; i < numEntries; i++) {
@@ -111,48 +96,21 @@ final class CheckpointV5 implements Checkpoint {
                     positions.put(segmentId,
                             
org.apache.pulsar.client.api.MessageId.fromByteArray(msgIdBytes));
                 }
-                yield new CheckpointV5(positions, 
Instant.ofEpochMilli(creationMillis));
+                yield new CheckpointV5(positions);
             }
             default -> throw new IOException("Unknown checkpoint type: " + 
type);
         };
     }
 
-    static Checkpoint atTimestamp(Instant timestamp) {
-        return new TimestampCheckpoint(timestamp);
-    }
-
     /**
-     * Sentinel checkpoint for earliest/latest positions.
+     * Sentinel checkpoint for earliest/latest positions. Encoded as a single 
type byte.
      */
-    private record SentinelCheckpoint(byte type, Instant creation) implements 
Checkpoint {
+    private record SentinelCheckpoint(byte type) implements Checkpoint {
         @Override
         public byte[] toByteArray() {
             ByteBuffer buf = ByteBuffer.allocate(1);
             buf.put(type);
             return buf.array();
         }
-
-        @Override
-        public Instant creationTime() {
-            return creation;
-        }
-    }
-
-    /**
-     * Checkpoint that positions at a specific timestamp.
-     */
-    private record TimestampCheckpoint(Instant timestamp) implements 
Checkpoint {
-        @Override
-        public byte[] toByteArray() {
-            ByteBuffer buf = ByteBuffer.allocate(1 + 8);
-            buf.put(TYPE_TIMESTAMP);
-            buf.putLong(timestamp.toEpochMilli());
-            return buf.array();
-        }
-
-        @Override
-        public Instant creationTime() {
-            return timestamp;
-        }
     }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 2c53f0bcc48..7e8971a4ff9 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl.v5;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -142,11 +141,6 @@ public final class PulsarClientProviderV5 implements 
PulsarClientProvider {
         return CheckpointV5.LATEST;
     }
 
-    @Override
-    public Checkpoint checkpointAtTimestamp(Instant timestamp) {
-        return CheckpointV5.atTimestamp(timestamp);
-    }
-
     // --- Authentication ---
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 9288780f39b..1eedbb14566 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl.v5;
 
 import io.github.merlimat.slog.Logger;
 import java.time.Duration;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -221,8 +220,7 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
      * Update the checkpoint position for the segment this message belongs to. 
Called as
      * messages cross the boundary from the wire-buffer to the application — 
that's the
      * point at which a subsequent {@link #checkpoint()} should reflect "I 
have processed
-     * this message", so a {@link #seek(Checkpoint)} back to that checkpoint 
redelivers
-     * everything after it.
+     * this message".
      *
      * <p>{@code msg} may be null (timeout or interrupt path); returns it 
unchanged so the
      * caller can pass through the receive result without an extra null-check.
@@ -237,22 +235,7 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
     @Override
     public Checkpoint checkpoint() {
         Map<Long, org.apache.pulsar.client.api.MessageId> positions = new 
HashMap<>(lastReceivedPositions);
-        return new CheckpointV5(positions, Instant.now());
-    }
-
-    @Override
-    public void seek(Checkpoint checkpoint) throws PulsarClientException {
-        try {
-            seekAsync(checkpoint).get();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException("Seek interrupted", e);
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof PulsarClientException pce) {
-                throw pce;
-            }
-            throw new PulsarClientException(e.getCause());
-        }
+        return new CheckpointV5(positions);
     }
 
     @Override
@@ -298,36 +281,6 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T> {
         return CompletableFuture.completedFuture(checkpoint());
     }
 
-    CompletableFuture<Void> seekAsync(Checkpoint checkpoint) {
-        if (checkpoint instanceof CheckpointV5 cp) {
-            List<CompletableFuture<Void>> futures = new ArrayList<>();
-            for (var entry : cp.segmentPositions().entrySet()) {
-                var readerFuture = segmentReaders.get(entry.getKey());
-                if (readerFuture != null) {
-                    futures.add(readerFuture.thenCompose(r -> 
r.seekAsync(entry.getValue())));
-                }
-            }
-            return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
-                    .whenComplete((__, ___) -> messageQueue.clear());
-        } else if (checkpoint == CheckpointV5.EARLIEST) {
-            return 
seekAllAsync(org.apache.pulsar.client.api.MessageId.earliest);
-        } else if (checkpoint == CheckpointV5.LATEST) {
-            return seekAllAsync(org.apache.pulsar.client.api.MessageId.latest);
-        } else {
-            return CompletableFuture.failedFuture(
-                    new PulsarClientException("Unsupported checkpoint type: " 
+ checkpoint.getClass()));
-        }
-    }
-
-    private CompletableFuture<Void> 
seekAllAsync(org.apache.pulsar.client.api.MessageId position) {
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-        for (var readerFuture : segmentReaders.values()) {
-            futures.add(readerFuture.thenCompose(r -> r.seekAsync(position)));
-        }
-        return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
-                .whenComplete((__, ___) -> messageQueue.clear());
-    }
-
     CompletableFuture<Void> closeAsync() {
         closed = true;
         try {
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index 24cf63dacbc..d18b0351f00 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -19,11 +19,9 @@
 package org.apache.pulsar.client.impl.v5;
 
 import java.time.Duration;
-import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.v5.MessageId;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.StreamConsumer;
 import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
@@ -126,18 +124,6 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
         return this;
     }
 
-    @Override
-    public StreamConsumerBuilderV5<T> seek(MessageId messageId) {
-        // Seek will be applied after consumer creation
-        // Store for later use
-        return this;
-    }
-
-    @Override
-    public StreamConsumerBuilderV5<T> seek(Instant timestamp) {
-        return this;
-    }
-
     @Override
     public StreamConsumerBuilderV5<T> subscriptionProperties(Map<String, 
String> properties) {
         conf.setSubscriptionProperties(properties);
diff --git 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
index bcbe08d7e7c..c11381e4e9b 100644
--- 
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
+++ 
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
@@ -22,7 +22,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
-import java.time.Instant;
 import java.util.Map;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -39,12 +38,11 @@ public class CheckpointV5Test {
 
     @Test
     public void testRegularCheckpointRoundtrip() throws Exception {
-        Instant created = Instant.ofEpochMilli(1_700_000_000_000L);
         Map<Long, MessageId> positions = Map.of(
                 0L, v4(10, 20),
                 1L, v4(30, 40),
                 2L, v4(50, 60));
-        CheckpointV5 original = new CheckpointV5(positions, created);
+        CheckpointV5 original = new CheckpointV5(positions);
 
         byte[] bytes = original.toByteArray();
         Checkpoint decoded = CheckpointV5.fromByteArray(bytes);
@@ -52,7 +50,6 @@ public class CheckpointV5Test {
         assertTrue(decoded instanceof CheckpointV5,
                 "expected CheckpointV5, got " + decoded.getClass());
         CheckpointV5 decodedV5 = (CheckpointV5) decoded;
-        assertEquals(decodedV5.creationTime(), created);
         assertEquals(decodedV5.segmentPositions().size(), 3);
         assertEquals(decodedV5.segmentPositions().get(0L), v4(10, 20));
         assertEquals(decodedV5.segmentPositions().get(1L), v4(30, 40));
@@ -61,17 +58,15 @@ public class CheckpointV5Test {
 
     @Test
     public void testRegularCheckpointWithEmptyPositions() throws Exception {
-        Instant created = Instant.ofEpochMilli(42L);
-        CheckpointV5 original = new CheckpointV5(Map.of(), created);
+        CheckpointV5 original = new CheckpointV5(Map.of());
 
         CheckpointV5 decoded = (CheckpointV5) 
CheckpointV5.fromByteArray(original.toByteArray());
-        assertEquals(decoded.creationTime(), created);
         assertEquals(decoded.segmentPositions().size(), 0);
     }
 
     @Test
     public void testSegmentPositionsIsImmutable() {
-        CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2)), 
Instant.EPOCH);
+        CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2)));
         assertThrows(UnsupportedOperationException.class,
                 () -> cp.segmentPositions().put(1L, v4(5, 6)));
     }
@@ -100,26 +95,6 @@ public class CheckpointV5Test {
         assertEquals(CheckpointV5.LATEST.toByteArray().length, 1);
     }
 
-    // --- Timestamp checkpoint ---
-
-    @Test
-    public void testTimestampCheckpointRoundtrip() throws Exception {
-        Instant ts = Instant.ofEpochMilli(1_234_567_890L);
-        Checkpoint cp = CheckpointV5.atTimestamp(ts);
-
-        assertEquals(cp.creationTime(), ts);
-
-        Checkpoint decoded = CheckpointV5.fromByteArray(cp.toByteArray());
-        assertEquals(decoded.creationTime(), ts);
-    }
-
-    @Test
-    public void testTimestampCheckpointEncodesTypeAndMillis() {
-        Instant ts = Instant.ofEpochMilli(555L);
-        byte[] bytes = CheckpointV5.atTimestamp(ts).toByteArray();
-        assertEquals(bytes.length, 1 + 8, "timestamp wire: [type][millis]");
-    }
-
     // --- Error handling ---
 
     @Test

Reply via email to