merlimat opened a new pull request, #25696:
URL: https://github.com/apache/pulsar/pull/25696

   ## Summary
   
   Two new operational primitives on a scalable-topic subscription, exposed as 
admin REST endpoints, admin client methods, and `pulsar-admin` CLI subcommands.
   
   ### `seek` by wall-clock timestamp
   
   Reset every per-segment cursor to a point in time. The controller uses each 
segment's recorded `[createdAtMs, sealedAtMs)` window to dispatch the cheapest 
per-segment op:
   
   | Segment relative to `t` | Per-segment op |
   | --- | --- |
   | Sealed entirely **before** `t` | skip-all (cursor → end) |
   | Created entirely **after** `t` | seek to `timestamp=0` (earliest) |
   | Alive at `t` | seek to `timestamp=t` |
   
   ### `clear-backlog`
   
   Dispatch skip-all on the subscription across every segment in the DAG.
   
   ### Plumbing
   
   - **Per-segment endpoints** under `/segments/.../subscription/{sub}/seek` 
and `.../skip-all` (super-user, routed to segment owner). Call 
`Subscription.resetCursor` / `clearBacklog` under the hood.
   - **Parent-topic endpoints** under `/scalable/.../subscriptions/{sub}/seek` 
and `.../skip-all`, gated on `RESET_CURSOR` / `SKIP` authz, routed to the 
controller leader.
   - **Admin client** interface + impl pairs (sync/async): segment-level 
(`seekSegmentSubscription`, `clearSegmentSubscriptionBacklog`) and parent-level 
(`seekSubscription`, `clearBacklog`).
   - **CLI**:
     - `pulsar-admin scalable-topics seek <topic> --subscription <s> --time 1h` 
— `--time` is a relative offset; the absolute timestamp passed to the broker is 
`now - offset`. Standard time-unit converter (`1s`, `5m`, `1h`, `5d`, …).
     - `pulsar-admin scalable-topics clear-backlog <topic> --subscription <s>`.
   
   ### Removals
   
   Subscription seek is now an **admin** operation, not a consumer operation. 
The following V5 client surface goes away:
   
   - `StreamConsumerBuilder.seek(MessageId)` and `seek(Instant)` — were 
placeholder no-ops. Initial position is set via 
`subscriptionInitialPosition(EARLIEST/LATEST)`; timestamp seek is the new admin 
call.
   - `CheckpointConsumer.seek(Checkpoint)` and the async counterpart. Connector 
frameworks restore from a saved checkpoint via 
`CheckpointConsumerBuilder.startPosition(Checkpoint)`.
   - `Checkpoint.atTimestamp(Instant)` factory and the underlying 
`TimestampCheckpoint` type — timestamp positioning is the admin surface, not a 
checkpoint kind.
   - `Checkpoint.creationTime()` — was just metadata, not part of the position 
vector. Wire format simplifies accordingly. Connector frameworks that need 
timing can record it themselves.
   
   ## Test plan
   
   - [x] `ScalableTopicControllerTest`:
     - `testSeekSubscriptionDispatchesPerSegmentByTimestamp` — three segments 
at hand-picked timestamps (one fully before `t`, one straddling, one fully 
after); asserts the right per-segment admin call is issued for each.
     - `testClearBacklogDispatchesSkipAllToEverySegment` — N skip-all calls for 
N segments.
   - [x] V5 checkpoint suites updated and green: 
`V5CheckpointConsumerBasicTest`, `V5CheckpointConsumerDagReplayTest`, 
`V5CheckpointConsumerGroupTest`, `V5AsyncApisTest`, `CheckpointV5Test`.
   - [x] Checkstyle clean (`pulsar-broker`, `pulsar-client-admin-api`, 
`pulsar-client-admin`, `pulsar-client-api-v5`, `pulsar-client-v5`, 
`pulsar-client-tools`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to