This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35aa105ec31 [feat][broker] PIP-468: scalable-topic seek +
clear-backlog admin API (#25696)
35aa105ec31 is described below
commit 35aa105ec31c6f25432deaa8c330d28217689c8a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 18:15:43 2026 -0700
[feat][broker] PIP-468: scalable-topic seek + clear-backlog admin API
(#25696)
---
.../pulsar/broker/admin/v2/ScalableTopics.java | 86 ++++++++
.../apache/pulsar/broker/admin/v2/Segments.java | 130 ++++++++++++
.../service/scalable/ScalableTopicController.java | 107 ++++++++++
.../service/scalable/ScalableTopicService.java | 21 ++
.../scalable/ScalableTopicControllerTest.java | 158 +++++++++++++++
.../pulsar/client/api/v5/V5AsyncApisTest.java | 24 +--
.../api/v5/V5CheckpointConsumerBasicTest.java | 38 +---
.../client/api/v5/V5SeekSubscriptionTest.java | 218 +++++++++++++++++++++
.../apache/pulsar/client/admin/ScalableTopics.java | 59 ++++++
.../client/admin/internal/ScalableTopicsImpl.java | 51 +++++
.../apache/pulsar/client/api/v5/Checkpoint.java | 28 +--
.../pulsar/client/api/v5/CheckpointConsumer.java | 11 --
.../client/api/v5/CheckpointConsumerBuilder.java | 5 +-
.../client/api/v5/StreamConsumerBuilder.java | 20 --
.../api/v5/async/AsyncCheckpointConsumer.java | 9 -
.../api/v5/internal/PulsarClientProvider.java | 3 -
.../org/apache/pulsar/client/api/v5/Examples.java | 20 --
.../apache/pulsar/admin/cli/CmdScalableTopics.java | 45 +++++
.../client/impl/v5/AsyncCheckpointConsumerV5.java | 5 -
.../apache/pulsar/client/impl/v5/CheckpointV5.java | 58 +-----
.../client/impl/v5/PulsarClientProviderV5.java | 6 -
.../client/impl/v5/ScalableCheckpointConsumer.java | 51 +----
.../client/impl/v5/StreamConsumerBuilderV5.java | 14 --
.../pulsar/client/impl/v5/CheckpointV5Test.java | 31 +--
24 files changed, 901 insertions(+), 297 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
index 01b2320f99a..32468c48892 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java
@@ -404,6 +404,92 @@ public class ScalableTopics extends AdminResource {
});
}
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/seek")
+ @ApiOperation(value = "Reset a subscription's cursor on every segment to
the given"
+ + " wall-clock timestamp. The controller uses each segment's
recorded sealed-time"
+ + " window to dispatch the cheapest per-segment op.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Cursor reset successfully on
all segments"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission on
the namespace"),
+ @ApiResponse(code = 404, message = "Scalable topic or subscription
doesn't exist"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void seekSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Wall-clock millis since the unix epoch",
required = true)
+ @QueryParam("timestamp") long timestampMs) {
+ validateNamespaceName(tenant, namespace);
+ TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
+
+ validateTopicOperationAsync(tn, TopicOperation.RESET_CURSOR,
subscription)
+ .thenCompose(__ -> onControllerLeader(tn,
+ svc -> svc.seekSubscription(tn, subscription,
timestampMs)))
+ .thenAccept(__ -> {
+ log.info().attr("clientAppId", clientAppId())
+ .attr("subscription", subscription).attr("topic",
tn)
+ .attr("timestampMs", timestampMs)
+ .log("Sought subscription on scalable topic");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId", clientAppId())
+ .attr("subscription", subscription).attr("topic",
tn)
+ .exception(ex).log("Failed to seek subscription");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @POST
+
@Path("/{tenant}/{namespace}/{topic}/subscriptions/{subscription}/skip-all")
+ @ApiOperation(value = "Skip every undelivered message on the subscription,
across every"
+ + " segment in the DAG (advance each per-segment cursor to the
end).")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Backlog cleared successfully
on all segments"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission on
the namespace"),
+ @ApiResponse(code = 404, message = "Scalable topic or subscription
doesn't exist"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void clearBacklog(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription) {
+ validateNamespaceName(tenant, namespace);
+ TopicName tn = TopicName.get(TopicDomain.topic.value(), namespaceName,
encodedTopic);
+
+ validateTopicOperationAsync(tn, TopicOperation.SKIP, subscription)
+ .thenCompose(__ -> onControllerLeader(tn,
+ svc -> svc.clearBacklog(tn, subscription)))
+ .thenAccept(__ -> {
+ log.info().attr("clientAppId", clientAppId())
+ .attr("subscription", subscription).attr("topic",
tn)
+ .log("Cleared backlog on scalable topic");
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId", clientAppId())
+ .attr("subscription", subscription).attr("topic",
tn)
+ .exception(ex).log("Failed to clear subscription
backlog");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
// --- Segment operations ---
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
index 0ae8b3264a5..507119518b0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java
@@ -228,6 +228,136 @@ public class Segments extends AdminResource {
});
}
+ @POST
+
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/seek")
+ @ApiOperation(value = "Reset the segment topic's subscription cursor to
the given timestamp."
+ + " Super-user only.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Cursor reset successfully"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 404, message = "Segment topic or subscription
not found"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void seekSubscription(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify the parent topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)",
required = true)
+ @PathParam("descriptor") String descriptor,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Wall-clock millis since the unix epoch",
required = true)
+ @QueryParam("timestamp") long timestampMs,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateNamespaceName(tenant, namespace);
+ TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
+
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
+ .thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+ .thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ // Segment topic not loaded on this owner — could be
ownership
+ // churn or a transient unload. 503 so the caller
retries (and so
+ // the parent-topic seek can't conflate this with the
+ // subscription-not-found case below, which is
tolerated).
+ throw new
RestException(Response.Status.SERVICE_UNAVAILABLE,
+ "Segment topic not loaded: " + segmentTopic);
+ }
+ var sub = optTopic.get().getSubscription(subscription);
+ if (sub == null) {
+ throw new RestException(Response.Status.NOT_FOUND,
+ "Subscription not found on segment: " +
subscription);
+ }
+ return sub.resetCursor(timestampMs)
+ .exceptionally(ex -> {
+ Throwable cause = ex instanceof
java.util.concurrent.CompletionException
+ ? ex.getCause() : ex;
+ if (cause instanceof
org.apache.pulsar.broker.service.BrokerServiceException
+ .SubscriptionInvalidCursorPosition) {
+ // Empty managed ledger — no entries to
seek to. The
+ // cursor is already at the only valid
position, so this
+ // is a no-op (e.g. a freshly-split active
child segment
+ // that hasn't received any messages yet).
+ log.debug().attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .log("Empty segment, treating seek
as no-op");
+ return null;
+ }
+ throw org.apache.pulsar.common.util.FutureUtil
+ .wrapToCompletionException(cause);
+ });
+ })
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription",
subscription).attr("timestampMs", timestampMs)
+ .exception(ex).log("Failed to seek segment
subscription");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @POST
+
@Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/skip-all")
+ @ApiOperation(value = "Skip every undelivered message on the segment
topic's subscription —"
+ + " advance the cursor to the end. Super-user only.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Backlog cleared successfully"),
+ @ApiResponse(code = 401, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 403, message = "This operation requires
super-user access"),
+ @ApiResponse(code = 404, message = "Segment topic or subscription
not found"),
+ @ApiResponse(code = 500, message = "Internal server error")})
+ public void clearSubscriptionBacklog(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify the parent topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)",
required = true)
+ @PathParam("descriptor") String descriptor,
+ @ApiParam(value = "Subscription name", required = true)
+ @PathParam("subscription") String subscription,
+ @ApiParam(value = "Whether leader broker redirected this call to
this broker.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
+ validateNamespaceName(tenant, namespace);
+ TopicName segmentTopic = segmentTopicName(tenant, namespace,
encodedTopic, descriptor);
+
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic,
authoritative))
+ .thenCompose(__ ->
pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString()))
+ .thenCompose(optTopic -> {
+ if (optTopic.isEmpty()) {
+ // 503 vs 404 — see the rationale on the seek endpoint
above. The
+ // distinction lets the parent-topic clear-backlog
tolerate
+ // subscription-not-found while still surfacing
transient unloads.
+ throw new
RestException(Response.Status.SERVICE_UNAVAILABLE,
+ "Segment topic not loaded: " + segmentTopic);
+ }
+ var sub = optTopic.get().getSubscription(subscription);
+ if (sub == null) {
+ throw new RestException(Response.Status.NOT_FOUND,
+ "Subscription not found on segment: " +
subscription);
+ }
+ return sub.clearBacklog();
+ })
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error().attr("clientAppId",
clientAppId()).attr("segment", segmentTopic)
+ .attr("subscription", subscription)
+ .exception(ex).log("Failed to clear segment
subscription backlog");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/{topic}/{descriptor}")
@ApiOperation(value = "Delete a segment topic. Super-user only.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 552ca321d84..3bd562a48e1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -603,6 +603,113 @@ public class ScalableTopicController {
return CompletableFuture.allOf(futures);
}
+ /**
+ * Reset a subscription's cursor across every segment to the given
wall-clock
+ * timestamp. We use each segment's recorded {@code [createdAtMs,
sealedAtMs)}
+ * window to dispatch the cheapest possible per-segment op:
+ *
+ * <ul>
+ * <li>Segment was sealed before {@code timestampMs} — all of its data
is from
+ * earlier; cursor → end of segment (skip-all).</li>
+ * <li>Segment was created at-or-after {@code timestampMs} — all of its
data is
+ * from at-or-after; cursor → earliest (seek to {@code
timestamp=0}).</li>
+ * <li>Segment is alive at {@code timestampMs} (active or straddling
sealed) —
+ * cursor seeks to {@code timestamp}.</li>
+ * </ul>
+ *
+ * <p>Per-segment failures are surfaced (the call fails-fast). The only
tolerated
+ * outcome is {@code 404 Not Found} from the segment endpoint, which the
segment
+ * REST resource emits exclusively for "subscription not present on this
segment"
+ * (e.g. the cursor hasn't been materialised yet — it will propagate
lazily and
+ * the next seek will land it). Transient unloads / ownership churn
surface as
+ * {@code 503} from the segment endpoint and propagate to the caller, who
can
+ * retry the parent-level operation.
+ */
+ public CompletableFuture<Void> seekSubscription(String subscription, long
timestampMs) {
+ checkLeader();
+ SegmentLayout layout = this.currentLayout;
+ CompletableFuture<?>[] futures =
layout.getAllSegments().values().stream()
+ .map(segment -> seekSubscriptionOnSegment(segment,
subscription, timestampMs))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futures);
+ }
+
+ /**
+ * Skip every undelivered message on the subscription, across every
segment in the
+ * DAG. Equivalent to advancing each per-segment cursor to the end.
+ */
+ public CompletableFuture<Void> clearBacklog(String subscription) {
+ checkLeader();
+ SegmentLayout layout = this.currentLayout;
+ CompletableFuture<?>[] futures =
layout.getAllSegments().values().stream()
+ .map(segment -> clearSubscriptionBacklogOnSegment(segment,
subscription))
+ .toArray(CompletableFuture[]::new);
+ return CompletableFuture.allOf(futures);
+ }
+
+ private CompletableFuture<Void> seekSubscriptionOnSegment(SegmentInfo
segment,
+ String
subscription,
+ long
timestampMs) {
+ // Classify the segment relative to the requested timestamp using the
recorded
+ // sealed-time / created-time. This is what makes the parent-level
seek O(N segments)
+ // worth of cheap RPCs rather than O(N) timestamp-based scans of every
segment's
+ // managed ledger.
+ if (segment.isSealed() && segment.sealedAtMs() > 0
+ && segment.sealedAtMs() <= timestampMs) {
+ // Segment fully predates timestamp → skip everything on this
segment.
+ return clearSubscriptionBacklogOnSegment(segment, subscription);
+ }
+ long effective = timestampMs;
+ if (segment.createdAtMs() > 0 && segment.createdAtMs() >= timestampMs)
{
+ // Segment fully postdates timestamp → seek to start (timestamp=0
== earliest
+ // for managed-ledger reset-cursor-by-timestamp semantics).
+ effective = 0L;
+ }
+ String segmentName = toSegmentPersistentName(segment);
+ try {
+ return brokerService.getPulsar().getAdminClient()
+
.scalableTopics().seekSegmentSubscriptionAsync(segmentName, subscription,
effective)
+ .exceptionally(ex -> {
+ Throwable cause =
+
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException
+ .NotFoundException) {
+ // 404 from the segment endpoint == "subscription
not present
+ // on this segment" (the segment endpoint uses 503
for
+ // "topic not loaded"). The cursor will propagate
lazily;
+ // tolerated.
+ return null;
+ }
+ throw
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+ });
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void>
clearSubscriptionBacklogOnSegment(SegmentInfo segment,
+ String
subscription) {
+ String segmentName = toSegmentPersistentName(segment);
+ try {
+ return brokerService.getPulsar().getAdminClient()
+ .scalableTopics()
+ .clearSegmentSubscriptionBacklogAsync(segmentName,
subscription)
+ .exceptionally(ex -> {
+ Throwable cause =
+
org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException
+ .NotFoundException) {
+ // Subscription not present on this segment —
tolerated.
+ // (See seek path for the 404-vs-503 contract.)
+ return null;
+ }
+ throw
org.apache.pulsar.common.util.FutureUtil.wrapToCompletionException(cause);
+ });
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
private CompletableFuture<Void> createSubscriptionOnSegment(SegmentInfo
segment, String subscription) {
String persistentName = toSegmentUnderlyingPersistentName(segment);
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index e43ca06ccbd..12c7064e844 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -196,6 +196,27 @@ public class ScalableTopicService {
.thenCompose(controller ->
controller.deleteSubscription(subscription));
}
+ /**
+ * Reset a subscription's cursor across every segment to the given
wall-clock
+ * timestamp. Delegates to the controller leader; the controller uses each
+ * segment's recorded {@code [createdAtMs, sealedAtMs)} window to dispatch
the
+ * cheapest per-segment op (skip-all, seek-by-timestamp, or
seek-to-earliest).
+ */
+ public CompletableFuture<Void> seekSubscription(TopicName topic, String
subscription,
+ long timestampMs) {
+ return getOrCreateController(topic)
+ .thenCompose(controller ->
controller.seekSubscription(subscription, timestampMs));
+ }
+
+ /**
+ * Skip every undelivered message on the subscription, across every
segment in
+ * the DAG. Delegates to the controller leader.
+ */
+ public CompletableFuture<Void> clearBacklog(TopicName topic, String
subscription) {
+ return getOrCreateController(topic)
+ .thenCompose(controller ->
controller.clearBacklog(subscription));
+ }
+
/**
* Get aggregated stats for a scalable topic. Read-only: does not require
leadership.
* Returns segment-DAG counts and per-subscription consumer counts, read
from the
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index c00b1923504..8c75823ebef 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.scalable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -456,6 +457,163 @@ public class ScalableTopicControllerTest {
}
}
+ // --- Seek subscription / clear backlog ---
+
+ /**
+ * The seek path classifies each segment by its {@code [createdAtMs,
sealedAtMs)} window
+ * against the requested timestamp. With 3 segments — one sealed entirely
before the
+ * timestamp, one straddling, and one created entirely after — we expect:
+ * <ul>
+ * <li>Sealed-and-old segment: skip-all admin call (cursor → end).</li>
+ * <li>Straddling segment: seek admin call with the requested
timestamp.</li>
+ * <li>Created-after segment: seek admin call with timestamp 0 (i.e.
earliest).</li>
+ * </ul>
+ */
+ @Test
+ public void testSeekSubscriptionDispatchesPerSegmentByTimestamp() throws
Exception {
+ // Build a custom metadata with three segments at specific timestamps.
+ long t0 = 1_000_000L;
+ long t1 = 2_000_000L;
+ long t2 = 3_000_000L;
+ long t3 = 4_000_000L;
+ // Segment 0: created at t0, sealed at t1 (entirely before tSeek).
+ // Segment 1: created at t1, still active (straddles tSeek).
+ // Segment 2: created at t3, still active (entirely after tSeek).
+ long tSeek = t2;
+
+ org.apache.pulsar.common.scalable.SegmentInfo seg0 = new
org.apache.pulsar.common.scalable.SegmentInfo(
+ 0L,
+ org.apache.pulsar.common.scalable.HashRange.of(0x0000, 0x3FFF),
+ org.apache.pulsar.common.scalable.SegmentState.SEALED,
+ java.util.List.of(), java.util.List.of(3L),
+ /*createdAtEpoch*/ 0, /*sealedAtEpoch*/ 1,
+ /*createdAtMs*/ t0, /*sealedAtMs*/ t1);
+ org.apache.pulsar.common.scalable.SegmentInfo seg1 = new
org.apache.pulsar.common.scalable.SegmentInfo(
+ 1L,
+ org.apache.pulsar.common.scalable.HashRange.of(0x4000, 0x7FFF),
+ org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
+ java.util.List.of(), java.util.List.of(),
+ 0, -1, t1, -1);
+ org.apache.pulsar.common.scalable.SegmentInfo seg2 = new
org.apache.pulsar.common.scalable.SegmentInfo(
+ 2L,
+ org.apache.pulsar.common.scalable.HashRange.of(0x8000, 0xFFFF),
+ org.apache.pulsar.common.scalable.SegmentState.ACTIVE,
+ java.util.List.of(), java.util.List.of(),
+ 0, -1, t3, -1);
+
+ TopicName seekTopic = TopicName.get("topic://tenant/ns/seek-topic");
+ ScalableTopicMetadata md = ScalableTopicMetadata.builder()
+ .epoch(2).nextSegmentId(4)
+ .segments(java.util.Map.of(0L, seg0, 1L, seg1, 2L, seg2))
+ .properties(java.util.Map.of())
+ .build();
+ resources.createScalableTopicAsync(seekTopic, md).get();
+ ScalableTopicController c = new ScalableTopicController(
+ seekTopic, resources, brokerService, coordinationService);
+ try {
+ // Stub the segment-aware admin calls.
+ when(scalableTopics.seekSegmentSubscriptionAsync(anyString(),
anyString(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(),
anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ c.initialize().get();
+ c.seekSubscription("sub-a", tSeek).get();
+
+ // Sealed-old segment: skip-all admin call once.
+ verify(scalableTopics, org.mockito.Mockito.times(1))
+ .clearSegmentSubscriptionBacklogAsync(anyString(),
org.mockito.ArgumentMatchers.eq("sub-a"));
+
+ // Two seek calls: one for the straddling segment (t == tSeek),
one for the
+ // created-after segment (t == 0 because seg2.createdAtMs >=
tSeek).
+ org.mockito.ArgumentCaptor<Long> tsCaptor =
org.mockito.ArgumentCaptor.forClass(Long.class);
+ verify(scalableTopics, org.mockito.Mockito.times(2))
+ .seekSegmentSubscriptionAsync(anyString(),
+ org.mockito.ArgumentMatchers.eq("sub-a"),
+ tsCaptor.capture());
+ java.util.List<Long> sentTs = tsCaptor.getAllValues();
+ assertTrue(sentTs.contains(tSeek), "expected straddling segment to
receive tSeek");
+ assertTrue(sentTs.contains(0L), "expected created-after segment to
receive 0");
+ } finally {
+ c.close().join();
+ }
+ }
+
+ /** Clear-backlog dispatches skip-all to every segment in the DAG. */
+ @Test
+ public void testClearBacklogDispatchesSkipAllToEverySegment() throws
Exception {
+ when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(),
anyString()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ controller.initialize().get();
+
+ controller.clearBacklog("sub-a").get();
+ // INITIAL_SEGMENTS active segments, no sealed ones in the baseline →
exactly N calls.
+ verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS))
+ .clearSegmentSubscriptionBacklogAsync(anyString(),
+ org.mockito.ArgumentMatchers.eq("sub-a"));
+ }
+
+ /**
+ * 404 from a per-segment seek means "subscription not present on that
segment" —
+ * the controller tolerates this as success (cursor will materialise
lazily).
+ */
+ @Test
+ public void testSeekTolerates404SubscriptionNotFound() throws Exception {
+ when(scalableTopics.seekSegmentSubscriptionAsync(anyString(),
anyString(), anyLong()))
+ .thenReturn(CompletableFuture.failedFuture(
+ new
org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException(
+ new RuntimeException("Subscription not found"),
+ "Subscription not found", 404)));
+ controller.initialize().get();
+
+ // Should not throw — every segment's 404 is swallowed.
+ controller.seekSubscription("sub-a", System.currentTimeMillis()).get();
+ }
+
+ /**
+ * 503 from the segment endpoint == "topic not loaded yet". This is a
transient
+ * unload (e.g. ownership churn); the controller MUST surface it so the
caller can
+ * retry the parent-level operation, instead of silently skipping segments.
+ */
+ @Test
+ public void testSeekPropagates503TransientUnload() throws Exception {
+ org.apache.pulsar.client.admin.PulsarAdminException unavailable =
+ new org.apache.pulsar.client.admin.PulsarAdminException(
+ new RuntimeException("Service Unavailable"),
+ "Segment topic not loaded", 503);
+ when(scalableTopics.seekSegmentSubscriptionAsync(anyString(),
anyString(), anyLong()))
+ .thenReturn(CompletableFuture.failedFuture(unavailable));
+ controller.initialize().get();
+
+ java.util.concurrent.ExecutionException ex =
+
org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class,
+ () -> controller.seekSubscription("sub-a",
System.currentTimeMillis()).get());
+ Throwable cause = ex.getCause() instanceof
java.util.concurrent.CompletionException
+ ? ex.getCause().getCause() : ex.getCause();
+ assertTrue(cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException,
+ "expected 503 PulsarAdminException to propagate, got " +
cause);
+ }
+
+ /** Same contract as seek: 503 from a per-segment clear-backlog must
propagate. */
+ @Test
+ public void testClearBacklogPropagates503TransientUnload() throws
Exception {
+ org.apache.pulsar.client.admin.PulsarAdminException unavailable =
+ new org.apache.pulsar.client.admin.PulsarAdminException(
+ new RuntimeException("Service Unavailable"),
+ "Segment topic not loaded", 503);
+ when(scalableTopics.clearSegmentSubscriptionBacklogAsync(anyString(),
anyString()))
+ .thenReturn(CompletableFuture.failedFuture(unavailable));
+ controller.initialize().get();
+
+ java.util.concurrent.ExecutionException ex =
+
org.testng.Assert.expectThrows(java.util.concurrent.ExecutionException.class,
+ () -> controller.clearBacklog("sub-a").get());
+ Throwable cause = ex.getCause() instanceof
java.util.concurrent.CompletionException
+ ? ex.getCause().getCause() : ex.getCause();
+ assertTrue(cause instanceof
org.apache.pulsar.client.admin.PulsarAdminException,
+ "expected 503 PulsarAdminException to propagate, got " +
cause);
+ }
+
// --- Sealed-segment GC ---
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
index 54164b23339..28ba3bc1a7e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
@@ -24,9 +24,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -176,14 +174,16 @@ public class V5AsyncApisTest extends V5ClientBaseTest {
}
@Test
- public void testAsyncCheckpointConsumerCheckpointAndSeek() throws
Exception {
+ public void testAsyncCheckpointConsumerCheckpoint() throws Exception {
+ // Verifies that the async view of CheckpointConsumer surfaces the same
+ // checkpoint as the sync API and that it completes asynchronously.
String topic = newScalableTopic(1);
@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
.topic(topic)
.create();
- for (int i = 0; i < 6; i++) {
+ for (int i = 0; i < 3; i++) {
producer.newMessage().value("v-" + i).send();
}
@@ -194,28 +194,12 @@ public class V5AsyncApisTest extends V5ClientBaseTest {
.create();
AsyncCheckpointConsumer<String> async = consumer.async();
-
- // Read 3, snapshot via async, read 3 more, then async-seek back.
for (int i = 0; i < 3; i++) {
Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
assertEquals(msg.value(), "v-" + i);
}
Checkpoint mark = async.checkpoint().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
assertNotNull(mark, "async checkpoint must complete with a non-null
position");
-
- for (int i = 3; i < 6; i++) {
- Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
- assertEquals(msg.value(), "v-" + i);
- }
-
- async.seek(mark).get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
- Set<String> redelivered = new HashSet<>();
- for (int i = 0; i < 3; i++) {
- Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
- redelivered.add(msg.value());
- }
- assertEquals(redelivered, Set.of("v-3", "v-4", "v-5"),
- "async seek did not redeliver the post-checkpoint window");
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
index 0cd3b78c8a4..697491ad74a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
@@ -32,7 +32,7 @@ import org.testng.annotations.Test;
* Basic end-to-end coverage for {@link CheckpointConsumer}: the unmanaged
reader-style
* API used by connector frameworks (Flink, Spark) — specifically the
start-position
* sentinels (earliest / latest), checkpoint roundtrip via {@link
Checkpoint#toByteArray()},
- * resume from a saved checkpoint, and {@link
CheckpointConsumer#seek(Checkpoint)}.
+ * and resume from a saved checkpoint.
*
* <p>All scenarios use a single-segment scalable topic to keep the focus on
the
* consumer surface itself; cross-segment position-vector behavior lives in the
@@ -189,42 +189,6 @@ public class V5CheckpointConsumerBasicTest extends
V5ClientBaseTest {
assertEquals(received, List.of("v-3", "v-4", "v-5"));
}
- @Test
- public void testSeekRewindsToEarlierCheckpoint() throws Exception {
- String topic = newScalableTopic(1);
-
- @Cleanup
- Producer<String> producer = v5Client.newProducer(Schema.string())
- .topic(topic)
- .create();
- for (int i = 0; i < 6; i++) {
- producer.newMessage().value("v-" + i).send();
- }
-
- @Cleanup
- CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
- .topic(topic)
- .startPosition(Checkpoint.earliest())
- .create();
-
- // Read 3, snapshot, read 3 more, then seek back to the snapshot —
should
- // re-deliver the second batch.
- for (int i = 0; i < 3; i++) {
- assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-"
+ i);
- }
- Checkpoint mark = consumer.checkpoint();
- for (int i = 3; i < 6; i++) {
- assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-"
+ i);
- }
-
- consumer.seek(mark);
- for (int i = 3; i < 6; i++) {
- Message<String> msg = consumer.receive(Duration.ofSeconds(5));
- assertNotNull(msg, "seek did not redeliver message v-" + i);
- assertEquals(msg.value(), "v-" + i);
- }
- }
-
@Test
public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception
{
String topic = newScalableTopic(1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
new file mode 100644
index 00000000000..a5e3a3d1952
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SeekSubscriptionTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the parent-topic admin seek + clear-backlog
operations on a
+ * scalable-topic subscription. Exercises the real per-segment cursor-reset
path
+ * (managed-ledger {@code resetCursor(timestamp)} / {@code clearBacklog}),
which the
+ * controller-level mock-based tests do not cover.
+ */
+public class V5SeekSubscriptionTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSeekRewindsCursorAcrossSingleSegment() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("seek-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Produce 5 messages, consume + ack — drains the cursor so the rewind
is observable.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumer.acknowledge(msg.id());
+ }
+
+ // Snapshot the rewind target — anything published before this should
be redelivered.
+ long mark = System.currentTimeMillis();
+ // Slack for the broker's wall-clock vs the test's: producing
immediately at `mark`
+ // can land at `mark` or `mark+1`. Sleep a tick so post-mark messages
have a strictly
+ // later publish time.
+ Thread.sleep(10);
+
+ for (int i = 5; i < 10; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+ for (int i = 5; i < 10; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumer.acknowledge(msg.id());
+ }
+
+ // Drained — confirm.
+ assertNull(consumer.receive(Duration.ofMillis(500)));
+
+ // Admin seek back to the mark — post-mark messages must be
redelivered.
+ admin.scalableTopics().seekSubscription(topic, "seek-sub", mark);
+
+ Set<String> redelivered = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "expected redelivery of post-mark message #" +
i);
+ redelivered.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(redelivered, Set.of("v-5", "v-6", "v-7", "v-8", "v-9"),
+ "seek must redeliver exactly the post-mark window");
+ }
+
+ /**
+ * Regression: a freshly-split active child segment that has received no
messages
+ * straddles {@code timestampMs} but has an empty managed ledger; the
per-segment
+ * {@code resetCursor} on it would have failed with {@code
SubscriptionInvalidCursorPosition},
+ * which used to bring the entire parent-level seek down. The fix treats
empty
+ * segments as a no-op.
+ */
+ @Test
+ public void testSeekToleratesEmptyChildSegmentsAfterSplit() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("split-seek-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Populate the initial segment.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ // Drain so we don't redeliver them later.
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumer.acknowledge(msg.id());
+ }
+
+ // Split — children are empty; parent is sealed with all 5 messages.
+ long activeSegmentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeSegmentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(activeSegmentId >= 0);
+ admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+ // Wait for the split to be visible.
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+
+ // Seek to "now" — this exercises the bug. The two empty active
children
+ // straddle the timestamp; without the fix, the controller's allOf
would fail
+ // because resetCursor on an empty managed ledger throws
+ // SubscriptionInvalidCursorPosition.
+ long now = System.currentTimeMillis();
+ admin.scalableTopics().seekSubscription(topic, "split-seek-sub", now);
+
+ // After seek, no backlog (the sealed parent's data is all from before
`now`,
+ // and the children have no data). So receive must time out.
+ assertNull(consumer.receive(Duration.ofSeconds(2)),
+ "seek across empty children must succeed and leave no
undelivered messages");
+ }
+
+ @Test
+ public void testClearBacklogDropsAllUndeliveredMessages() throws Exception
{
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+
.batchingPolicy(org.apache.pulsar.client.api.v5.config.BatchingPolicy.ofDisabled())
+ .create();
+
+ // Establish the subscription via a short-lived consumer — and then
close it. The
+ // V5 receive-queue would otherwise prefetch the produced messages
into its
+ // client-side buffer, masking the broker-side cursor advance from
clearBacklog.
+ // Closing also releases the segment-cursor fences so clearBacklog
itself can
+ // fence them cleanly.
+ QueueConsumer<String> bootstrap =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("clear-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ bootstrap.close();
+
+ for (int i = 0; i < 20; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+
+ admin.scalableTopics().clearBacklog(topic, "clear-sub");
+
+ // Re-attach the consumer — every cursor is at the end, so no backlog
remains.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("clear-sub")
+ .subscribe();
+
+ assertNull(consumer.receive(Duration.ofSeconds(2)),
+ "clear-backlog must skip every undelivered message");
+
+ // Subsequent messages still flow through.
+ producer.newMessage().value("after-clear").send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), "after-clear");
+ consumer.acknowledge(msg.id());
+ }
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
index dd306c5805e..10da169d14e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java
@@ -206,6 +206,38 @@ public interface ScalableTopics {
*/
CompletableFuture<Void> deleteSubscriptionAsync(String topic, String
subscription);
+ /**
+ * Reset a subscription's cursor across every segment to the given
wall-clock
+ * timestamp. The controller uses each segment's recorded sealed-time
window to
+ * dispatch the cheapest per-segment op.
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @param subscription Subscription name
+ * @param timestampMs Wall-clock millis since the unix epoch
+ */
+ void seekSubscription(String topic, String subscription, long timestampMs)
+ throws PulsarAdminException;
+
+ /**
+ * Reset a subscription's cursor across every segment, asynchronously.
+ */
+ CompletableFuture<Void> seekSubscriptionAsync(String topic, String
subscription,
+ long timestampMs);
+
+ /**
+ * Skip every undelivered message on the subscription, across every
segment in the
+ * DAG (advance each per-segment cursor to the end).
+ *
+ * @param topic Topic name in the format "tenant/namespace/topic"
+ * @param subscription Subscription name
+ */
+ void clearBacklog(String topic, String subscription) throws
PulsarAdminException;
+
+ /**
+ * Skip every undelivered message on the subscription, asynchronously.
+ */
+ CompletableFuture<Void> clearBacklogAsync(String topic, String
subscription);
+
/**
* Split a segment into two halves.
*
@@ -298,4 +330,31 @@ public interface ScalableTopics {
*/
CompletableFuture<Long> getSegmentSubscriptionBacklogAsync(String
segmentTopic,
String
subscription);
+
+ /**
+ * Reset the segment topic's subscription cursor to the given wall-clock
timestamp.
+ * Routes to the broker that owns the segment topic.
+ *
+ * <p>Used internally by the parent-topic seek operation in
+ * {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController
+ * ScalableTopicController}: the controller classifies each segment by its
+ * {@code [createdAtMs, sealedAtMs)} window against the requested
timestamp and
+ * dispatches per-segment seek / skip-all calls.
+ *
+ * @param segmentTopic Full segment topic name ({@code
segment://tenant/namespace/topic/descriptor})
+ * @param subscription Subscription name
+ * @param timestampMs Wall-clock millis since the unix epoch
+ */
+ CompletableFuture<Void> seekSegmentSubscriptionAsync(String segmentTopic,
String subscription,
+ long timestampMs);
+
+ /**
+ * Skip every undelivered message on the segment topic's subscription —
advance the
+ * cursor to the end of the segment.
+ *
+ * @param segmentTopic Full segment topic name ({@code
segment://tenant/namespace/topic/descriptor})
+ * @param subscription Subscription name
+ */
+ CompletableFuture<Void> clearSegmentSubscriptionBacklogAsync(String
segmentTopic,
+ String
subscription);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
index 779579f1d1b..5a040123d2e 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java
@@ -184,6 +184,33 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncDeleteRequest(path);
}
+ @Override
+ public void seekSubscription(String topic, String subscription, long
timestampMs)
+ throws PulsarAdminException {
+ sync(() -> seekSubscriptionAsync(topic, subscription, timestampMs));
+ }
+
+ @Override
+ public CompletableFuture<Void> seekSubscriptionAsync(String topic, String
subscription,
+ long timestampMs) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path =
topicPath(tn).path("subscriptions").path(subscription).path("seek")
+ .queryParam("timestamp", timestampMs);
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void clearBacklog(String topic, String subscription) throws
PulsarAdminException {
+ sync(() -> clearBacklogAsync(topic, subscription));
+ }
+
+ @Override
+ public CompletableFuture<Void> clearBacklogAsync(String topic, String
subscription) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path =
topicPath(tn).path("subscriptions").path(subscription).path("skip-all");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
// --- Split ---
@Override
@@ -274,6 +301,30 @@ public class ScalableTopicsImpl extends BaseResource
implements ScalableTopics {
return asyncGetRequest(path, Long.class);
}
+ @Override
+ public CompletableFuture<Void> seekSegmentSubscriptionAsync(String
segmentTopic,
+ String
subscription,
+ long
timestampMs) {
+ TopicName tn = TopicName.get(segmentTopic);
+ WebTarget path = adminSegments
+ .path(tn.getTenant()).path(tn.getNamespacePortion())
+ .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+ .path("subscription").path(subscription).path("seek")
+ .queryParam("timestamp", timestampMs);
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public CompletableFuture<Void> clearSegmentSubscriptionBacklogAsync(String
segmentTopic,
+ String
subscription) {
+ TopicName tn = TopicName.get(segmentTopic);
+ WebTarget path = adminSegments
+ .path(tn.getTenant()).path(tn.getNamespacePortion())
+ .path(tn.getLocalName()).path(tn.getSegmentDescriptor())
+ .path("subscription").path(subscription).path("skip-all");
+ return asyncPostRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
// --- Helpers ---
private static TopicName validateTopic(String topic) {
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
index 6b24fe8943d..e872bb17158 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/Checkpoint.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.api.v5;
import java.io.IOException;
-import java.time.Instant;
import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
/**
@@ -30,9 +29,12 @@ import
org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
* serialized for external storage (e.g. Flink state, S3) using {@link
#toByteArray()}.
*
* <p>This is the sole position type used with {@link CheckpointConsumer} —
for initial
- * positioning use the static factories {@link #earliest()}, {@link #latest()},
- * {@link #atTimestamp(Instant)}, or {@link #fromByteArray(byte[])} to restore
from
- * a previously saved checkpoint.
+ * positioning use the static factories {@link #earliest()}, {@link
#latest()}, or
+ * {@link #fromByteArray(byte[])} to restore from a previously saved
checkpoint.
+ *
+ * <p>For timestamp-based positioning, use the {@code scalable-topics seek}
admin
+ * operation on the subscription instead — see
+ * {@link org.apache.pulsar.client.admin.ScalableTopics#seekSubscription}.
*/
public interface Checkpoint {
@@ -44,13 +46,6 @@ public interface Checkpoint {
*/
byte[] toByteArray();
- /**
- * The time at which this checkpoint was created.
- *
- * @return the creation timestamp of this checkpoint as an {@link Instant}
- */
- Instant creationTime();
-
// --- Static factories ---
/**
@@ -71,17 +66,6 @@ public interface Checkpoint {
return PulsarClientProvider.get().latestCheckpoint();
}
- /**
- * A checkpoint that positions at the first message published at or after
the given timestamp.
- *
- * @param timestamp the timestamp to position at
- * @return a {@link Checkpoint} that will start consuming from the first
message at or after
- * the given timestamp
- */
- static Checkpoint atTimestamp(Instant timestamp) {
- return PulsarClientProvider.get().checkpointAtTimestamp(timestamp);
- }
-
/**
* Deserialize a checkpoint from a byte array previously obtained via
{@link #toByteArray()}.
*
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
index 0d278fb3eb5..40c4d8b3af2 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumer.java
@@ -91,17 +91,6 @@ public interface CheckpointConsumer<T> extends Closeable {
*/
Checkpoint checkpoint();
- // --- Seek ---
-
- /**
- * Seek to a previously saved checkpoint, or to a sentinel position such as
- * {@link Checkpoint#earliest()} or {@link Checkpoint#latest()}.
- *
- * @param checkpoint the checkpoint to seek to
- * @throws PulsarClientException if the seek fails or a connection error
occurs
- */
- void seek(Checkpoint checkpoint) throws PulsarClientException;
-
// --- Async ---
/**
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index 1a67dbcfe84..9f917e9ff7b 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -62,9 +62,8 @@ public interface CheckpointConsumerBuilder<T> {
/**
* Set the initial position for this consumer.
*
- * <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()},
- * {@link Checkpoint#atTimestamp}, or {@link Checkpoint#fromByteArray} to
- * create the appropriate starting position.
+ * <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()}, or
+ * {@link Checkpoint#fromByteArray} to create the appropriate starting
position.
*
* <p>Defaults to {@link Checkpoint#latest()} if not specified.
*
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index ab813842b4f..5f111246471 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.api.v5;
import java.time.Duration;
-import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
@@ -88,25 +87,6 @@ public interface StreamConsumerBuilder<T> {
*/
StreamConsumerBuilder<T> subscriptionName(String subscriptionName);
- // --- Seek (initial position override) ---
-
- /**
- * Reset the subscription to a specific message ID.
- *
- * @param messageId the message ID to seek to
- * @return this builder instance for chaining
- */
- StreamConsumerBuilder<T> seek(MessageId messageId);
-
- /**
- * Reset the subscription to a specific timestamp. The subscription
- * will be positioned at the first message published at or after this
timestamp.
- *
- * @param timestamp the timestamp to seek to
- * @return this builder instance for chaining
- */
- StreamConsumerBuilder<T> seek(Instant timestamp);
-
// --- Optional ---
/**
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
index f7bb04bfaba..b663499e175 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/async/AsyncCheckpointConsumer.java
@@ -69,15 +69,6 @@ public interface AsyncCheckpointConsumer<T> {
*/
CompletableFuture<Checkpoint> checkpoint();
- /**
- * Seek to a checkpoint asynchronously.
- *
- * @param checkpoint the checkpoint to seek to
- * @return a {@link CompletableFuture} that completes when the consumer
has been repositioned
- * to the given checkpoint
- */
- CompletableFuture<Void> seek(Checkpoint checkpoint);
-
/**
* Close this consumer asynchronously.
*
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index faff56205d6..e27a02302bc 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.api.v5.internal;
import java.io.IOException;
-import java.time.Instant;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Supplier;
@@ -87,8 +86,6 @@ public interface PulsarClientProvider {
Checkpoint latestCheckpoint();
- Checkpoint checkpointAtTimestamp(Instant timestamp);
-
// --- Authentication ---
Authentication authenticationToken(String token);
diff --git
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index b3439d31c0a..1218c8ca27c 100644
---
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -543,26 +543,6 @@ public class Examples {
}
}
- /** Time-travel seek — rewind to a specific timestamp. */
- void checkpointConsumerSeek(PulsarClient client) throws Exception {
- try (var consumer = client.newCheckpointConsumer(Schema.string())
- .topic("events")
- .startPosition(Checkpoint.latest())
- .create()) {
-
- // Seek back to replay from a specific time
-
consumer.seek(Checkpoint.atTimestamp(Instant.parse("2025-12-01T00:00:00Z")));
-
- while (true) {
- Message<String> msg = consumer.receive(Duration.ofSeconds(5));
- if (msg == null) {
- break;
- }
- System.out.printf("[%s] %s%n", msg.publishTime(), msg.value());
- }
- }
- }
-
//
==================================================================================
// Helper types for the examples
//
==================================================================================
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
index c4a308cbdce..0391653198f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdScalableTopics.java
@@ -137,6 +137,49 @@ public class CmdScalableTopics extends CmdBase {
}
}
+ @Command(description = "Reset a subscription's cursor on every segment to
a given"
+ + " point in time. Pass --time as a relative offset (e.g. 1h, 5d)
— the cursor"
+ + " is reset to (now - offset).")
+ private class SeekSubscriptionCmd extends CliCommand {
+ @Parameters(description = "tenant/namespace/topic", arity = "1")
+ private String topic;
+
+ @Option(names = {"-s", "--subscription"},
+ description = "Subscription name", required = true)
+ private String subscription;
+
+ @Option(names = {"-t", "--time"},
+ description = "Relative offset in the past to seek to (e.g.
1h, 5d, 30m)",
+ required = true,
+ converter =
org.apache.pulsar.cli.converters.picocli.TimeUnitToMillisConverter.class)
+ private Long offsetMillis;
+
+ @Override
+ void run() throws Exception {
+ long target = System.currentTimeMillis() - offsetMillis;
+ scalableTopics().seekSubscription(topic, subscription, target);
+ print("Reset subscription " + subscription + " on topic " + topic
+ + " to timestamp " + target + " (" + offsetMillis + "ms
ago)");
+ }
+ }
+
+ @Command(description = "Skip every undelivered message on the
subscription, across every"
+ + " segment in the DAG.")
+ private class ClearBacklogCmd extends CliCommand {
+ @Parameters(description = "tenant/namespace/topic", arity = "1")
+ private String topic;
+
+ @Option(names = {"-s", "--subscription"},
+ description = "Subscription name", required = true)
+ private String subscription;
+
+ @Override
+ void run() throws Exception {
+ scalableTopics().clearBacklog(topic, subscription);
+ print("Cleared backlog of subscription " + subscription + " on
topic " + topic);
+ }
+ }
+
@Command(description = "Merge two adjacent segments into one")
private class MergeSegmentsCmd extends CliCommand {
@Parameters(description = "tenant/namespace/topic", arity = "1")
@@ -166,5 +209,7 @@ public class CmdScalableTopics extends CmdBase {
addCommand("delete", new DeleteCmd());
addCommand("split-segment", new SplitSegmentCmd());
addCommand("merge-segments", new MergeSegmentsCmd());
+ addCommand("seek", new SeekSubscriptionCmd());
+ addCommand("clear-backlog", new ClearBacklogCmd());
}
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
index c4a7bf8d380..ace455e7ba9 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncCheckpointConsumerV5.java
@@ -65,11 +65,6 @@ final class AsyncCheckpointConsumerV5<T> implements
AsyncCheckpointConsumer<T> {
return consumer.checkpointAsync();
}
- @Override
- public CompletableFuture<Void> seek(Checkpoint checkpoint) {
- return consumer.seekAsync(checkpoint);
- }
-
@Override
public CompletableFuture<Void> close() {
return consumer.closeAsync();
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
index 566209a2b32..b9ae8c0a777 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointV5.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl.v5;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -35,17 +34,14 @@ final class CheckpointV5 implements Checkpoint {
private static final byte TYPE_REGULAR = 0;
private static final byte TYPE_EARLIEST = 1;
private static final byte TYPE_LATEST = 2;
- private static final byte TYPE_TIMESTAMP = 3;
- static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST,
Instant.EPOCH);
- static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST,
Instant.EPOCH);
+ static final Checkpoint EARLIEST = new SentinelCheckpoint(TYPE_EARLIEST);
+ static final Checkpoint LATEST = new SentinelCheckpoint(TYPE_LATEST);
private final Map<Long, org.apache.pulsar.client.api.MessageId>
segmentPositions;
- private final Instant creationTime;
- CheckpointV5(Map<Long, org.apache.pulsar.client.api.MessageId>
segmentPositions, Instant creationTime) {
+ CheckpointV5(Map<Long, org.apache.pulsar.client.api.MessageId>
segmentPositions) {
this.segmentPositions = Map.copyOf(segmentPositions);
- this.creationTime = creationTime;
}
/**
@@ -55,16 +51,11 @@ final class CheckpointV5 implements Checkpoint {
return segmentPositions;
}
- @Override
- public Instant creationTime() {
- return creationTime;
- }
-
@Override
public byte[] toByteArray() {
- // Format: [1 byte type] [8 bytes creationTimeMillis] [4 bytes
numEntries]
+ // Format: [1 byte type] [4 bytes numEntries]
// [for each entry: [8 bytes segmentId] [4 bytes msgIdLen]
[msgIdBytes]]
- int totalSize = 1 + 8 + 4;
+ int totalSize = 1 + 4;
Map<Long, byte[]> serializedIds = new HashMap<>();
for (var entry : segmentPositions.entrySet()) {
byte[] idBytes = entry.getValue().toByteArray();
@@ -74,7 +65,6 @@ final class CheckpointV5 implements Checkpoint {
ByteBuffer buf = ByteBuffer.allocate(totalSize);
buf.put(TYPE_REGULAR);
- buf.putLong(creationTime.toEpochMilli());
buf.putInt(segmentPositions.size());
for (var entry : serializedIds.entrySet()) {
buf.putLong(entry.getKey());
@@ -95,12 +85,7 @@ final class CheckpointV5 implements Checkpoint {
return switch (type) {
case TYPE_EARLIEST -> EARLIEST;
case TYPE_LATEST -> LATEST;
- case TYPE_TIMESTAMP -> {
- long millis = buf.getLong();
- yield new TimestampCheckpoint(Instant.ofEpochMilli(millis));
- }
case TYPE_REGULAR -> {
- long creationMillis = buf.getLong();
int numEntries = buf.getInt();
Map<Long, org.apache.pulsar.client.api.MessageId> positions =
new HashMap<>();
for (int i = 0; i < numEntries; i++) {
@@ -111,48 +96,21 @@ final class CheckpointV5 implements Checkpoint {
positions.put(segmentId,
org.apache.pulsar.client.api.MessageId.fromByteArray(msgIdBytes));
}
- yield new CheckpointV5(positions,
Instant.ofEpochMilli(creationMillis));
+ yield new CheckpointV5(positions);
}
default -> throw new IOException("Unknown checkpoint type: " +
type);
};
}
- static Checkpoint atTimestamp(Instant timestamp) {
- return new TimestampCheckpoint(timestamp);
- }
-
/**
- * Sentinel checkpoint for earliest/latest positions.
+ * Sentinel checkpoint for earliest/latest positions. Encoded as a single
type byte.
*/
- private record SentinelCheckpoint(byte type, Instant creation) implements
Checkpoint {
+ private record SentinelCheckpoint(byte type) implements Checkpoint {
@Override
public byte[] toByteArray() {
ByteBuffer buf = ByteBuffer.allocate(1);
buf.put(type);
return buf.array();
}
-
- @Override
- public Instant creationTime() {
- return creation;
- }
- }
-
- /**
- * Checkpoint that positions at a specific timestamp.
- */
- private record TimestampCheckpoint(Instant timestamp) implements
Checkpoint {
- @Override
- public byte[] toByteArray() {
- ByteBuffer buf = ByteBuffer.allocate(1 + 8);
- buf.put(TYPE_TIMESTAMP);
- buf.putLong(timestamp.toEpochMilli());
- return buf.array();
- }
-
- @Override
- public Instant creationTime() {
- return timestamp;
- }
}
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 2c53f0bcc48..7e8971a4ff9 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.client.impl.v5;
import java.io.IOException;
-import java.time.Instant;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -142,11 +141,6 @@ public final class PulsarClientProviderV5 implements
PulsarClientProvider {
return CheckpointV5.LATEST;
}
- @Override
- public Checkpoint checkpointAtTimestamp(Instant timestamp) {
- return CheckpointV5.atTimestamp(timestamp);
- }
-
// --- Authentication ---
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 9288780f39b..1eedbb14566 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl.v5;
import io.github.merlimat.slog.Logger;
import java.time.Duration;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -221,8 +220,7 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
* Update the checkpoint position for the segment this message belongs to.
Called as
* messages cross the boundary from the wire-buffer to the application —
that's the
* point at which a subsequent {@link #checkpoint()} should reflect "I
have processed
- * this message", so a {@link #seek(Checkpoint)} back to that checkpoint
redelivers
- * everything after it.
+ * this message".
*
* <p>{@code msg} may be null (timeout or interrupt path); returns it
unchanged so the
* caller can pass through the receive result without an extra null-check.
@@ -237,22 +235,7 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
@Override
public Checkpoint checkpoint() {
Map<Long, org.apache.pulsar.client.api.MessageId> positions = new
HashMap<>(lastReceivedPositions);
- return new CheckpointV5(positions, Instant.now());
- }
-
- @Override
- public void seek(Checkpoint checkpoint) throws PulsarClientException {
- try {
- seekAsync(checkpoint).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new PulsarClientException("Seek interrupted", e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof PulsarClientException pce) {
- throw pce;
- }
- throw new PulsarClientException(e.getCause());
- }
+ return new CheckpointV5(positions);
}
@Override
@@ -298,36 +281,6 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T> {
return CompletableFuture.completedFuture(checkpoint());
}
- CompletableFuture<Void> seekAsync(Checkpoint checkpoint) {
- if (checkpoint instanceof CheckpointV5 cp) {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (var entry : cp.segmentPositions().entrySet()) {
- var readerFuture = segmentReaders.get(entry.getKey());
- if (readerFuture != null) {
- futures.add(readerFuture.thenCompose(r ->
r.seekAsync(entry.getValue())));
- }
- }
- return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
- .whenComplete((__, ___) -> messageQueue.clear());
- } else if (checkpoint == CheckpointV5.EARLIEST) {
- return
seekAllAsync(org.apache.pulsar.client.api.MessageId.earliest);
- } else if (checkpoint == CheckpointV5.LATEST) {
- return seekAllAsync(org.apache.pulsar.client.api.MessageId.latest);
- } else {
- return CompletableFuture.failedFuture(
- new PulsarClientException("Unsupported checkpoint type: "
+ checkpoint.getClass()));
- }
- }
-
- private CompletableFuture<Void>
seekAllAsync(org.apache.pulsar.client.api.MessageId position) {
- List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (var readerFuture : segmentReaders.values()) {
- futures.add(readerFuture.thenCompose(r -> r.seekAsync(position)));
- }
- return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
- .whenComplete((__, ___) -> messageQueue.clear());
- }
-
CompletableFuture<Void> closeAsync() {
closed = true;
try {
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index 24cf63dacbc..d18b0351f00 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -19,11 +19,9 @@
package org.apache.pulsar.client.impl.v5;
import java.time.Duration;
-import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.v5.MessageId;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.StreamConsumer;
import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
@@ -126,18 +124,6 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
return this;
}
- @Override
- public StreamConsumerBuilderV5<T> seek(MessageId messageId) {
- // Seek will be applied after consumer creation
- // Store for later use
- return this;
- }
-
- @Override
- public StreamConsumerBuilderV5<T> seek(Instant timestamp) {
- return this;
- }
-
@Override
public StreamConsumerBuilderV5<T> subscriptionProperties(Map<String,
String> properties) {
conf.setSubscriptionProperties(properties);
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
index bcbe08d7e7c..c11381e4e9b 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/CheckpointV5Test.java
@@ -22,7 +22,6 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
-import java.time.Instant;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.v5.Checkpoint;
@@ -39,12 +38,11 @@ public class CheckpointV5Test {
@Test
public void testRegularCheckpointRoundtrip() throws Exception {
- Instant created = Instant.ofEpochMilli(1_700_000_000_000L);
Map<Long, MessageId> positions = Map.of(
0L, v4(10, 20),
1L, v4(30, 40),
2L, v4(50, 60));
- CheckpointV5 original = new CheckpointV5(positions, created);
+ CheckpointV5 original = new CheckpointV5(positions);
byte[] bytes = original.toByteArray();
Checkpoint decoded = CheckpointV5.fromByteArray(bytes);
@@ -52,7 +50,6 @@ public class CheckpointV5Test {
assertTrue(decoded instanceof CheckpointV5,
"expected CheckpointV5, got " + decoded.getClass());
CheckpointV5 decodedV5 = (CheckpointV5) decoded;
- assertEquals(decodedV5.creationTime(), created);
assertEquals(decodedV5.segmentPositions().size(), 3);
assertEquals(decodedV5.segmentPositions().get(0L), v4(10, 20));
assertEquals(decodedV5.segmentPositions().get(1L), v4(30, 40));
@@ -61,17 +58,15 @@ public class CheckpointV5Test {
@Test
public void testRegularCheckpointWithEmptyPositions() throws Exception {
- Instant created = Instant.ofEpochMilli(42L);
- CheckpointV5 original = new CheckpointV5(Map.of(), created);
+ CheckpointV5 original = new CheckpointV5(Map.of());
CheckpointV5 decoded = (CheckpointV5)
CheckpointV5.fromByteArray(original.toByteArray());
- assertEquals(decoded.creationTime(), created);
assertEquals(decoded.segmentPositions().size(), 0);
}
@Test
public void testSegmentPositionsIsImmutable() {
- CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2)),
Instant.EPOCH);
+ CheckpointV5 cp = new CheckpointV5(Map.of(0L, v4(1, 2)));
assertThrows(UnsupportedOperationException.class,
() -> cp.segmentPositions().put(1L, v4(5, 6)));
}
@@ -100,26 +95,6 @@ public class CheckpointV5Test {
assertEquals(CheckpointV5.LATEST.toByteArray().length, 1);
}
- // --- Timestamp checkpoint ---
-
- @Test
- public void testTimestampCheckpointRoundtrip() throws Exception {
- Instant ts = Instant.ofEpochMilli(1_234_567_890L);
- Checkpoint cp = CheckpointV5.atTimestamp(ts);
-
- assertEquals(cp.creationTime(), ts);
-
- Checkpoint decoded = CheckpointV5.fromByteArray(cp.toByteArray());
- assertEquals(decoded.creationTime(), ts);
- }
-
- @Test
- public void testTimestampCheckpointEncodesTypeAndMillis() {
- Instant ts = Instant.ofEpochMilli(555L);
- byte[] bytes = CheckpointV5.atTimestamp(ts).toByteArray();
- assertEquals(bytes.length, 1 + 8, "timestamp wire: [type][millis]");
- }
-
// --- Error handling ---
@Test