+1 (binding) Thanks, xiangying
On Tue, Apr 7, 2026 at 7:52 PM Lari Hotari <[email protected]> wrote: > > +1 (binding) > > -Lari > > On 2026/04/06 16:17:01 Matteo Merli wrote: > > Matteo Merli <[email protected]> > > Wed, Apr 1, 9:45 AM (5 days ago) > > to Dev > > https://github.com/apache/pulsar/pull/25455 > > > > # PIP-466: New Java Client API (V5) with Scalable Topic Support > > > > # Background Knowledge > > > > Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary > > interface for Java > > applications since Pulsar's inception. The API surface has grown > > organically over the years to > > support partitioned topics, multiple subscription types, transactions, > > schema evolution, and more. > > > > **API versioning precedent.** Pulsar already went through a breaking API > > redesign in the 2.0 > > release, where the old API was moved into a separate compatibility module > > (`pulsar-client-1x-base`) and the main module was replaced with the new > > API. This PIP takes > > a less disruptive approach: the existing `pulsar-client-api` and > > `pulsar-client` modules stay > > completely unchanged, and the new API is introduced in an additional > > `pulsar-client-v5` module. > > Existing applications are unaffected; new applications can opt in to the V5 > > API. > > > > **Partitioned topics** are Pulsar's current mechanism for topic-level > > parallelism. A partitioned > > topic is a collection of N independent internal topics (partitions), each > > backed by a separate > > managed ledger. The client is responsible for routing messages to > > partitions (via `MessageRouter`) > > and is exposed to partition-level details through `TopicMessageId`, > > `getPartitionsForTopic()`, > > and partition-specific consumers. > > > > **Subscription types** control how messages are distributed to consumers. > > Pulsar supports four > > types — Exclusive, Failover, Shared, and Key_Shared — all accessed through > > a single `Consumer` > > interface. The interface exposes all operations regardless of which > > subscription type is in use, > > even though some operations (e.g., `acknowledgeCumulative()` on Shared) > > throw at runtime. > > > > **Scalable topics** ([PIP-460]( > > https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a new > > server-side mechanism where a topic is composed of a DAG of hash-range > > segments that can be > > dynamically split and merged by the broker. Unlike partitioned topics, the > > number of segments > > is invisible to the client and can change at runtime without application > > awareness. The client > > receives segment layout updates via a dedicated protocol command (DAG watch > > session) and routes > > messages based on hash-range matching. This PIP defines the client API > > designed to work with > > scalable topics; the broker-side design is covered by PIP-460. > > > > > > # Motivation > > > > ## 1. Remove partitioning from the client API > > > > The current API leaks partitioning everywhere: `TopicMessageId` carries > > partition indexes, > > `getPartitionsForTopic()` exposes the count, `MessageRouter` forces the > > application to make > > routing decisions, and consumers can be bound to specific partitions. This > > forces application > > code to deal with what is fundamentally a server-side scalability concern. > > > > With scalable topics, parallelism is achieved via hash-range segments > > managed entirely by the > > broker. The client should treat topics as opaque endpoints — per-key > > ordering is guaranteed > > when a key is specified, but the underlying parallelism (how many segments, > > which broker owns > > each) is invisible and dynamic. > > > > The current API cannot cleanly support this model because partitioning is > > baked into the type > > system (`TopicMessageId`), the builder API (`MessageRouter`, > > `MessageRoutingMode`), and the > > consumer model (partition-specific consumers, `getPartitionsForTopic()`). > > > > ## 2. Simplify an oversized API > > > > After years of organic growth, the API surface has accumulated significant > > baggage: > > > > - `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek, > > pause, unsubscribe, > > stats, get topic name, is connected, etc.) > > - `ConsumerBuilder` has 40+ configuration methods with overlapping semantics > > - Timeouts use `(long, TimeUnit)` in some places and `long` millis in others > > - Nullable returns vs empty — inconsistent across the API > > - `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used, > > clutters the API > > - SPI via reflection hack (`DefaultImplementation`) instead of standard > > `ServiceLoader` > > > > A new module can start with a clean, minimal surface using modern Java > > idioms. > > > > ## 3. Separate streaming vs queuing consumption > > > > The current `Consumer` mixes all four subscription types behind a single > > interface: > > > > - `acknowledgeCumulative()` is available but throws at runtime for Shared > > subscriptions > > - `negativeAcknowledge()` semantics differ between modes > > - `seek()` behavior varies depending on subscription type > > - Dead-letter policy only applies to Shared/Key_Shared > > > > This design means the compiler cannot help you — you discover misuse at > > runtime. Splitting into > > purpose-built consumer types where each exposes only the operations that > > make sense for its model > > improves both usability and correctness. > > > > ## 4. Native support for connector frameworks > > > > Connector frameworks like Apache Flink and Apache Spark need to manage > > their own offsets across > > all segments of a topic, take atomic snapshots, and seek back to a > > checkpoint on recovery. The > > current API has no first-class support for this — connectors resort to > > low-level `Reader` plus > > manual partition tracking plus brittle offset management. > > > > A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint` > > objects provides a clean > > integration point. > > > > > > ## Relationship to PIP-460 and long-term vision > > > > This PIP is a companion to [PIP-460: Scalable Topics]( > > https://github.com/apache/pulsar/blob/master/pip/pip-460.md), > > which defines the broker-side segment management, metadata storage, and > > admin APIs. The V5 > > client API is the primary interface for applications to use scalable topics > > — while the > > protocol commands and segment routing could theoretically be added to the > > v4 client, the V5 > > API was designed from the ground up to support the opaque, > > dynamically-segmented topic model > > that scalable topics provide. > > > > The V5 API is designed to support all use cases currently supported by the > > existing API: > > producing messages, consuming with ordered/shared/key-shared semantics, > > transactions, schema > > evolution, and end-to-end encryption. It is not a subset — it is a full > > replacement API. It > > also works with existing partitioned and non-partitioned topics, so > > applications can adopt the > > new API without changing their topic infrastructure. > > > > The long-term vision is for scalable topics and the V5 API to become the > > primary model, > > eventually deprecating partitioned/non-partitioned topics and the v4 API. > > However, this > > deprecation is explicitly **not** planned for the 5.0 release. The 5.0 > > release will ship both > > APIs side by side, with the V5 API recommended for new applications. A > > subsequent PIP will > > detail the migration path and deprecation timeline. > > > > While this PIP covers the Java client, the same API model (purpose-built > > consumer types, opaque > > topics, checkpoint-based connector support) will also be introduced in > > non-Java client SDKs > > (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will > > mirror the same > > concepts and follow the same approach of supporting both old and new topic > > types side by side. > > The non-Java SDKs will be covered by separate PIPs. > > > > > > # Goals > > > > ## In Scope > > > > - A new `pulsar-client-api-v5` module with new Java interfaces for > > Producer, StreamConsumer, > > QueueConsumer, CheckpointConsumer, and PulsarClient > > - A new `pulsar-client-v5` implementation module that wraps the existing v4 > > client transport > > and adds scalable topic routing > > - Support for all use cases currently supported by the existing API > > (produce, consume with > > ordered/shared/key-shared semantics, transactions, schema, encryption) > > - Purpose-built consumer types that separate streaming (ordered, cumulative > > ack) from queuing > > (parallel, individual ack) from checkpoint (unmanaged, framework-driven) > > - Opaque topic model where partition/segment details are hidden from the > > application > > - Modern Java API conventions: `Duration`, `Instant`, `Optional`, records, > > `ServiceLoader` > > - First-class transaction support in the main package > > - DAG watch protocol integration for live segment layout updates > > > > ## Out of Scope > > > > - Changes to the existing `pulsar-client-api` — it remains fully supported > > and unchanged > > - Changes to the wire protocol beyond what is needed for scalable topic DAG > > watch > > - Broker-side scalable topic management (split/merge algorithms, load > > balancing) — covered by > > [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md) > > and subsequent more > > specific PIPs > > - Migration path from v4 to v5 API — will be detailed in a subsequent PIP > > - Implementation details — this PIP focuses on the public API surface > > - Deprecation of the existing API or partitioned/non-partitioned topic types > > - TableView equivalent in v5 — may be added in a follow-up PIP > > > > > > # High Level Design > > > > The V5 client API is shipped as two new modules alongside the existing > > client: > > > > ``` > > pulsar-client-api-v5 (interfaces and value types only — no > > implementation) > > pulsar-client-v5 (implementation, depends on pulsar-client for > > transport) > > ``` > > > > The existing `pulsar-client-api` and `pulsar-client` modules are unchanged. > > Applications > > can use v4 and v5 in the same JVM. > > > > ## Entry point > > > > ```java > > PulsarClient client = PulsarClient.builder() > > .serviceUrl("pulsar://localhost:6650") > > .build(); > > ``` > > > > The `PulsarClient` interface provides builder methods for all > > producer/consumer types: > > > > ``` > > PulsarClient > > .newProducer(schema) -> ProducerBuilder -> Producer<T> > > .newStreamConsumer(schema) -> StreamConsumerBuilder -> > > StreamConsumer<T> > > .newQueueConsumer(schema) -> QueueConsumerBuilder -> > > QueueConsumer<T> > > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> > > CheckpointConsumer<T> > > .newTransaction() -> Transaction > > ``` > > > > ## Consumer types > > > > Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API > > provides three > > distinct consumer types: > > > > ```mermaid > > graph TD > > A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer] > > A -->|newQueueConsumer| C[QueueConsumer] > > A -->|newCheckpointConsumer| D[CheckpointConsumer] > > > > B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered > > pipelines] > > C -->|"Parallel, individual ack"| F[Work queues, task processing] > > D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors] > > ``` > > > > **StreamConsumer** — Ordered consumption with cumulative acknowledgment. > > Maps to > > Exclusive/Failover subscription semantics. Messages are delivered in order > > (per-key when keyed). > > > > **QueueConsumer** — Unordered parallel consumption with individual > > acknowledgment. Maps to > > Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack > > timeout, and > > redelivery backoff. > > > > **CheckpointConsumer** — Unmanaged consumption for connector frameworks. No > > subscription, no > > ack — position tracking is entirely external. Provides `checkpoint()` for > > atomic position > > snapshots and `seek(Checkpoint)` for recovery. > > > > ## Scalable topic integration > > > > When a V5 client connects to a `topic://` domain topic, it establishes a > > DAG watch session > > with the broker. The broker sends the current segment layout (which > > segments exist, their > > hash ranges, and which broker owns each) and pushes updates when the layout > > changes (splits, > > merges). > > > > ```mermaid > > sequenceDiagram > > participant Client as V5 Client > > participant Broker as Broker > > participant Meta as Metadata Store > > > > Client->>Broker: ScalableTopicLookup(topic) > > Broker->>Meta: Watch topic DAG > > Broker-->>Client: ScalableTopicUpdate(DAG) > > Note over Client: Create per-segment producers/consumers > > > > Meta-->>Broker: Segment split notification > > Broker-->>Client: ScalableTopicUpdate(new DAG) > > Note over Client: Add new segments, drain old > > ``` > > > > The `Producer` hashes message keys to determine which segment to send to, > > maintaining one > > internal producer per active segment. When segments split or merge, the > > client transparently > > creates new internal producers and drains old ones. > > > > ## Sync/async model > > > > All types are sync-first with an `.async()` accessor: > > > > ```java > > Producer<T> -> producer.async() -> AsyncProducer<T> > > StreamConsumer<T> -> consumer.async() -> AsyncStreamConsumer<T> > > QueueConsumer<T> -> consumer.async() -> AsyncQueueConsumer<T> > > CheckpointConsumer<T> -> consumer.async() -> AsyncCheckpointConsumer<T> > > Transaction -> txn.async() -> AsyncTransaction > > ``` > > > > Both views share the same underlying resources. > > > > > > # Detailed Design > > > > ## Design & Implementation Details > > > > ### Module structure > > > > ``` > > pulsar-client-api-v5/ > > org.apache.pulsar.client.api.v5 > > ├── PulsarClient, PulsarClientBuilder, PulsarClientException > > ├── Producer, ProducerBuilder > > ├── StreamConsumer, StreamConsumerBuilder > > ├── QueueConsumer, QueueConsumerBuilder > > ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint > > ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder > > ├── Transaction > > ├── async/ (AsyncProducer, AsyncMessageBuilder, Async*Consumer, > > AsyncTransaction) > > ├── auth/ (Authentication, AuthenticationData, CryptoKeyReader, > > ...) > > ├── config/ (BatchingPolicy, CompressionPolicy, TlsPolicy, > > BackoffPolicy, ...) > > ├── schema/ (Schema, SchemaInfo, SchemaType) > > └── internal/ (PulsarClientProvider — ServiceLoader SPI) > > > > pulsar-client-v5/ > > org.apache.pulsar.client.impl.v5 > > ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5 > > ├── ScalableTopicProducer, ProducerBuilderV5 > > ├── ScalableStreamConsumer, StreamConsumerBuilderV5 > > ├── ScalableQueueConsumer, QueueConsumerBuilderV5 > > ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5 > > ├── DagWatchClient, ClientSegmentLayout, SegmentRouter > > ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter > > ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5 > > └── Async*V5 wrappers > > ``` > > > > ### Key types > > > > **`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared > > between sync and > > async message sending: > > > > ```java > > interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> { > > BUILDER value(T value); > > BUILDER key(String key); > > BUILDER property(String name, String value); > > BUILDER eventTime(Instant eventTime); > > BUILDER deliverAfter(Duration delay); > > BUILDER deliverAt(Instant timestamp); > > BUILDER transaction(Transaction txn); > > } > > ``` > > > > `MessageBuilder<T>` extends it with `MessageId send()`. > > `AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId> > > send()`. > > > > **`Checkpoint`** — Opaque, serializable position vector across all segments: > > > > ```java > > interface Checkpoint { > > byte[] toByteArray(); > > Instant creationTime(); > > > > static Checkpoint earliest(); > > static Checkpoint latest(); > > static Checkpoint atTimestamp(Instant timestamp); > > static Checkpoint fromByteArray(byte[] data); > > } > > ``` > > > > Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment > > IDs to positions. > > The format is forward-compatible — checkpoints saved with fewer segments > > can be applied after > > splits/merges. > > > > **Configuration records** — Immutable records with static factories: > > > > | Record | Purpose | Example | > > |--------|---------|---------| > > | `BatchingPolicy` | Batching config | > > `BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` | > > | `CompressionPolicy` | Compression codec | > > `CompressionPolicy.of(CompressionType.ZSTD)` | > > | `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` | > > | `BackoffPolicy` | Retry backoff | > > `BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))` > > | > > | `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` | > > | `EncryptionPolicy` | E2E encryption | > > `EncryptionPolicy.forProducer(keyReader, "mykey")` | > > | `ChunkingPolicy` | Large msg chunking | > > `ChunkingPolicy.of(MemorySize.ofMB(10))` | > > > > ### SPI discovery > > > > Implementation is loaded via `java.util.ServiceLoader`: > > > > ```java > > // In pulsar-client-api-v5 > > public interface PulsarClientProvider { > > PulsarClientBuilder newClientBuilder(); > > <T> Schema<T> jsonSchema(Class<T> clazz); > > // ... factory methods for all SPI types > > } > > > > // In pulsar-client-v5 > > // > > META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider > > // -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5 > > ``` > > > > This replaces the reflection-based `DefaultImplementation` approach used in > > the current API. > > > > ## Public-facing Changes > > > > ### Public API > > > > This PIP introduces a new public Java API. The existing `pulsar-client-api` > > is unchanged. > > > > **New modules:** > > - `pulsar-client-api-v5` — interfaces and value types (compile dependency > > for applications) > > - `pulsar-client-v5` — implementation (runtime dependency) > > > > **New interfaces (summary):** > > > > | Interface | Methods | Description | > > |-----------|---------|-------------| > > | `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`, > > `newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`, > > `close()` | Entry point | > > | `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send > > messages | > > | `StreamConsumer<T>` | `receive()`, `receive(Duration)`, > > `acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption | > > | `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`, > > `negativeAcknowledge()`, `close()`, `async()` | Parallel consumption | > > | `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`, > > `checkpoint()`, `seek()`, `close()`, `async()` | Framework consumption | > > > > ### Configuration > > > > No new broker configuration is introduced by this PIP. The V5 client reuses > > the existing > > `ClientConfigurationData` internally. > > > > ### CLI > > > > No new CLI commands specific to the V5 API. > > > > ### Metrics > > > > No new metrics are introduced by the V5 client API itself. The underlying > > v4 producers and > > consumers continue to emit their existing metrics. > > > > > > # Monitoring > > > > The V5 client wraps v4 producers and consumers internally, so existing > > producer/consumer > > metrics (publish rate, latency, backlog, etc.) continue to work. Each > > internal segment > > producer/consumer appears as a separate instance in metrics, identified by > > the segment topic > > name. > > > > Operators should monitor: > > - Per-segment publish rates to detect hot segments (candidates for > > splitting) > > - DAG watch session reconnections (indicates broker restarts or network > > issues) > > - Segment producer creation/closure events in client logs during > > split/merge operations > > > > > > # Security Considerations > > > > The V5 client API does not introduce new security mechanisms. It delegates > > all authentication > > and authorization to the underlying v4 client: > > > > - Authentication is configured via `PulsarClientBuilder.authentication()` > > and delegated to the > > v4 `AuthenticationProvider` framework via `AuthenticationAdapter` > > - Topic-level authorization applies to the parent `topic://` name — > > accessing the underlying > > `segment://` topics uses the same tenant/namespace permissions > > - End-to-end encryption is supported via `EncryptionPolicy` on > > `ProducerBuilder`, delegated to > > the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter` > > - The new `CommandScalableTopicLookup` protocol command is sent only after > > the connection is > > authenticated and in the `Connected` state, consistent with other lookup > > commands > > > > No new REST endpoints are introduced by the client API itself. > > > > > > # Backward & Forward Compatibility > > > > ## Upgrade > > > > The V5 API is a new, additive module. No changes are required to existing > > applications. > > This follows the same approach as the 2.0 API redesign, where the old API > > was preserved in a > > separate compatibility module — except this time there is no breaking > > change at all. The > > existing API is unchanged and existing applications require no > > modifications. > > > > - Applications using `pulsar-client-api` (v4) continue to work without > > modification > > - New applications can adopt the V5 API by depending on `pulsar-client-v5` > > - The V5 API works with all topic types: scalable topics, partitioned > > topics, and > > non-partitioned topics — applications can migrate to the new API without > > changing their > > topic infrastructure > > - Both APIs can coexist in the same JVM — the V5 implementation wraps the > > v4 transport > > internally > > - A detailed migration path from v4 to v5 will be provided in a subsequent > > PIP > > - A seamless migration path to convert existing partitioned and > > non-partitioned topics to > > scalable topics will also be provided, allowing applications to > > transition their topic > > infrastructure without data loss or downtime > > > > To adopt the V5 API, applications add `pulsar-client-v5` as a dependency > > and use > > `PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package. > > > > ## Downgrade / Rollback > > > > Since the V5 API is a separate module, rollback is simply removing the > > dependency and reverting > > to v4 API calls. No broker-side changes are required. > > > > Applications using `CheckpointConsumer` should note that saved `Checkpoint` > > byte arrays are > > specific to the V5 implementation and cannot be used with v4 > > `Reader`/`Consumer`. > > > > ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations > > > > The V5 client API itself does not introduce geo-replication concerns — it > > connects to whichever > > cluster it is configured for. However, geo-replication of scalable topics > > has specific > > considerations (segment layout synchronization across clusters, > > cross-cluster split/merge > > coordination) that will be detailed in a subsequent PIP. > > > > > > # Alternatives > > > > ## Extend the existing Consumer interface > > > > We considered adding scalable topic support to the existing `Consumer` > > interface by adding new > > methods for checkpoint/seek and hiding segment details internally. This was > > rejected because: > > > > - The existing interface already has 60+ methods and is difficult to evolve > > - Adding checkpoint semantics alongside ack semantics would further confuse > > the API > > - The type system cannot prevent misuse (e.g., calling > > `acknowledgeCumulative()` on a Shared > > subscription) > > - Removing partition-related methods (`TopicMessageId`, `MessageRouter`) > > would break backward > > compatibility > > > > ## Builder-per-subscription-type on existing API > > > > We considered keeping a single `Consumer` type but using different builder > > types per subscription > > mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted > > methods). This was > > rejected because the returned `Consumer` would still expose all methods at > > the type level — the > > restriction would only be in documentation, not enforced by the compiler. > > > > ## Separate module vs extending existing module > > > > We chose a separate module (`pulsar-client-api-v5`) rather than adding new > > interfaces to > > `pulsar-client-api` because: > > > > - The v5 API uses different naming conventions (`value()` vs `getValue()`), > > different types > > (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs > > nullable) > > - Having both conventions in the same package would be confusing > > - A clean module boundary makes it clear which API generation an > > application is using > > - The v4 API can eventually be deprecated without affecting v5 users > > > > > > # General Notes > > > > The V5 API targets Java 17, the same as the rest of Pulsar. > > > > The implementation module (`pulsar-client-v5`) wraps the existing v4 > > `pulsar-client` for all > > transport-level operations. This means bug fixes and performance > > improvements to the v4 client > > automatically benefit V5 users, and the V5 module itself is relatively thin > > — primarily routing > > logic and API adaptation. > > > > > > -- > > Matteo Merli > > <[email protected]> > >
