+1 (binding)

-Lari

On 2026/04/06 16:17:01 Matteo Merli wrote:
> Matteo Merli <[email protected]>
> Wed, Apr 1, 9:45 AM (5 days ago)
> to Dev
> https://github.com/apache/pulsar/pull/25455
> 
> # PIP-466: New Java Client API (V5) with Scalable Topic Support
> 
> # Background Knowledge
> 
> Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary
> interface for Java
> applications since Pulsar's inception. The API surface has grown
> organically over the years to
> support partitioned topics, multiple subscription types, transactions,
> schema evolution, and more.
> 
> **API versioning precedent.** Pulsar already went through a breaking API
> redesign in the 2.0
> release, where the old API was moved into a separate compatibility module
> (`pulsar-client-1x-base`) and the main module was replaced with the new
> API. This PIP takes
> a less disruptive approach: the existing `pulsar-client-api` and
> `pulsar-client` modules stay
> completely unchanged, and the new API is introduced in an additional
> `pulsar-client-v5` module.
> Existing applications are unaffected; new applications can opt in to the V5
> API.
> 
> **Partitioned topics** are Pulsar's current mechanism for topic-level
> parallelism. A partitioned
> topic is a collection of N independent internal topics (partitions), each
> backed by a separate
> managed ledger. The client is responsible for routing messages to
> partitions (via `MessageRouter`)
> and is exposed to partition-level details through `TopicMessageId`,
> `getPartitionsForTopic()`,
> and partition-specific consumers.
> 
> **Subscription types** control how messages are distributed to consumers.
> Pulsar supports four
> types — Exclusive, Failover, Shared, and Key_Shared — all accessed through
> a single `Consumer`
> interface. The interface exposes all operations regardless of which
> subscription type is in use,
> even though some operations (e.g., `acknowledgeCumulative()` on Shared)
> throw at runtime.
> 
> **Scalable topics** ([PIP-460](
> https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a new
> server-side mechanism where a topic is composed of a DAG of hash-range
> segments that can be
> dynamically split and merged by the broker. Unlike partitioned topics, the
> number of segments
> is invisible to the client and can change at runtime without application
> awareness. The client
> receives segment layout updates via a dedicated protocol command (DAG watch
> session) and routes
> messages based on hash-range matching. This PIP defines the client API
> designed to work with
> scalable topics; the broker-side design is covered by PIP-460.
> 
> 
> # Motivation
> 
> ## 1. Remove partitioning from the client API
> 
> The current API leaks partitioning everywhere: `TopicMessageId` carries
> partition indexes,
> `getPartitionsForTopic()` exposes the count, `MessageRouter` forces the
> application to make
> routing decisions, and consumers can be bound to specific partitions. This
> forces application
> code to deal with what is fundamentally a server-side scalability concern.
> 
> With scalable topics, parallelism is achieved via hash-range segments
> managed entirely by the
> broker. The client should treat topics as opaque endpoints — per-key
> ordering is guaranteed
> when a key is specified, but the underlying parallelism (how many segments,
> which broker owns
> each) is invisible and dynamic.
> 
> The current API cannot cleanly support this model because partitioning is
> baked into the type
> system (`TopicMessageId`), the builder API (`MessageRouter`,
> `MessageRoutingMode`), and the
> consumer model (partition-specific consumers, `getPartitionsForTopic()`).
> 
> ## 2. Simplify an oversized API
> 
> After years of organic growth, the API surface has accumulated significant
> baggage:
> 
> - `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek,
> pause, unsubscribe,
>   stats, get topic name, is connected, etc.)
> - `ConsumerBuilder` has 40+ configuration methods with overlapping semantics
> - Timeouts use `(long, TimeUnit)` in some places and `long` millis in others
> - Nullable returns vs empty — inconsistent across the API
> - `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used,
> clutters the API
> - SPI via reflection hack (`DefaultImplementation`) instead of standard
> `ServiceLoader`
> 
> A new module can start with a clean, minimal surface using modern Java
> idioms.
> 
> ## 3. Separate streaming vs queuing consumption
> 
> The current `Consumer` mixes all four subscription types behind a single
> interface:
> 
> - `acknowledgeCumulative()` is available but throws at runtime for Shared
> subscriptions
> - `negativeAcknowledge()` semantics differ between modes
> - `seek()` behavior varies depending on subscription type
> - Dead-letter policy only applies to Shared/Key_Shared
> 
> This design means the compiler cannot help you — you discover misuse at
> runtime. Splitting into
> purpose-built consumer types where each exposes only the operations that
> make sense for its model
> improves both usability and correctness.
> 
> ## 4. Native support for connector frameworks
> 
> Connector frameworks like Apache Flink and Apache Spark need to manage
> their own offsets across
> all segments of a topic, take atomic snapshots, and seek back to a
> checkpoint on recovery. The
> current API has no first-class support for this — connectors resort to
> low-level `Reader` plus
> manual partition tracking plus brittle offset management.
> 
> A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint`
> objects provides a clean
> integration point.
> 
> 
> ## Relationship to PIP-460 and long-term vision
> 
> This PIP is a companion to [PIP-460: Scalable Topics](
> https://github.com/apache/pulsar/blob/master/pip/pip-460.md),
> which defines the broker-side segment management, metadata storage, and
> admin APIs. The V5
> client API is the primary interface for applications to use scalable topics
> — while the
> protocol commands and segment routing could theoretically be added to the
> v4 client, the V5
> API was designed from the ground up to support the opaque,
> dynamically-segmented topic model
> that scalable topics provide.
> 
> The V5 API is designed to support all use cases currently supported by the
> existing API:
> producing messages, consuming with ordered/shared/key-shared semantics,
> transactions, schema
> evolution, and end-to-end encryption. It is not a subset — it is a full
> replacement API. It
> also works with existing partitioned and non-partitioned topics, so
> applications can adopt the
> new API without changing their topic infrastructure.
> 
> The long-term vision is for scalable topics and the V5 API to become the
> primary model,
> eventually deprecating partitioned/non-partitioned topics and the v4 API.
> However, this
> deprecation is explicitly **not** planned for the 5.0 release. The 5.0
> release will ship both
> APIs side by side, with the V5 API recommended for new applications. A
> subsequent PIP will
> detail the migration path and deprecation timeline.
> 
> While this PIP covers the Java client, the same API model (purpose-built
> consumer types, opaque
> topics, checkpoint-based connector support) will also be introduced in
> non-Java client SDKs
> (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will
> mirror the same
> concepts and follow the same approach of supporting both old and new topic
> types side by side.
> The non-Java SDKs will be covered by separate PIPs.
> 
> 
> # Goals
> 
> ## In Scope
> 
> - A new `pulsar-client-api-v5` module with new Java interfaces for
> Producer, StreamConsumer,
>   QueueConsumer, CheckpointConsumer, and PulsarClient
> - A new `pulsar-client-v5` implementation module that wraps the existing v4
> client transport
>   and adds scalable topic routing
> - Support for all use cases currently supported by the existing API
> (produce, consume with
>   ordered/shared/key-shared semantics, transactions, schema, encryption)
> - Purpose-built consumer types that separate streaming (ordered, cumulative
> ack) from queuing
>   (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
> - Opaque topic model where partition/segment details are hidden from the
> application
> - Modern Java API conventions: `Duration`, `Instant`, `Optional`, records,
> `ServiceLoader`
> - First-class transaction support in the main package
> - DAG watch protocol integration for live segment layout updates
> 
> ## Out of Scope
> 
> - Changes to the existing `pulsar-client-api` — it remains fully supported
> and unchanged
> - Changes to the wire protocol beyond what is needed for scalable topic DAG
> watch
> - Broker-side scalable topic management (split/merge algorithms, load
> balancing) — covered by
>   [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md)
> and subsequent more
>   specific PIPs
> - Migration path from v4 to v5 API — will be detailed in a subsequent PIP
> - Implementation details — this PIP focuses on the public API surface
> - Deprecation of the existing API or partitioned/non-partitioned topic types
> - TableView equivalent in v5 — may be added in a follow-up PIP
> 
> 
> # High Level Design
> 
> The V5 client API is shipped as two new modules alongside the existing
> client:
> 
> ```
> pulsar-client-api-v5    (interfaces and value types only — no
> implementation)
> pulsar-client-v5        (implementation, depends on pulsar-client for
> transport)
> ```
> 
> The existing `pulsar-client-api` and `pulsar-client` modules are unchanged.
> Applications
> can use v4 and v5 in the same JVM.
> 
> ## Entry point
> 
> ```java
> PulsarClient client = PulsarClient.builder()
>         .serviceUrl("pulsar://localhost:6650")
>         .build();
> ```
> 
> The `PulsarClient` interface provides builder methods for all
> producer/consumer types:
> 
> ```
> PulsarClient
>   .newProducer(schema)            -> ProducerBuilder    -> Producer<T>
>   .newStreamConsumer(schema)      -> StreamConsumerBuilder ->
> StreamConsumer<T>
>   .newQueueConsumer(schema)       -> QueueConsumerBuilder  ->
> QueueConsumer<T>
>   .newCheckpointConsumer(schema)  -> CheckpointConsumerBuilder ->
> CheckpointConsumer<T>
>   .newTransaction()               -> Transaction
> ```
> 
> ## Consumer types
> 
> Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API
> provides three
> distinct consumer types:
> 
> ```mermaid
> graph TD
>     A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
>     A -->|newQueueConsumer| C[QueueConsumer]
>     A -->|newCheckpointConsumer| D[CheckpointConsumer]
> 
>     B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered
> pipelines]
>     C -->|"Parallel, individual ack"| F[Work queues, task processing]
>     D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
> ```
> 
> **StreamConsumer** — Ordered consumption with cumulative acknowledgment.
> Maps to
> Exclusive/Failover subscription semantics. Messages are delivered in order
> (per-key when keyed).
> 
> **QueueConsumer** — Unordered parallel consumption with individual
> acknowledgment. Maps to
> Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack
> timeout, and
> redelivery backoff.
> 
> **CheckpointConsumer** — Unmanaged consumption for connector frameworks. No
> subscription, no
> ack — position tracking is entirely external. Provides `checkpoint()` for
> atomic position
> snapshots and `seek(Checkpoint)` for recovery.
> 
> ## Scalable topic integration
> 
> When a V5 client connects to a `topic://` domain topic, it establishes a
> DAG watch session
> with the broker. The broker sends the current segment layout (which
> segments exist, their
> hash ranges, and which broker owns each) and pushes updates when the layout
> changes (splits,
> merges).
> 
> ```mermaid
> sequenceDiagram
>     participant Client as V5 Client
>     participant Broker as Broker
>     participant Meta as Metadata Store
> 
>     Client->>Broker: ScalableTopicLookup(topic)
>     Broker->>Meta: Watch topic DAG
>     Broker-->>Client: ScalableTopicUpdate(DAG)
>     Note over Client: Create per-segment producers/consumers
> 
>     Meta-->>Broker: Segment split notification
>     Broker-->>Client: ScalableTopicUpdate(new DAG)
>     Note over Client: Add new segments, drain old
> ```
> 
> The `Producer` hashes message keys to determine which segment to send to,
> maintaining one
> internal producer per active segment. When segments split or merge, the
> client transparently
> creates new internal producers and drains old ones.
> 
> ## Sync/async model
> 
> All types are sync-first with an `.async()` accessor:
> 
> ```java
> Producer<T>            -> producer.async()   -> AsyncProducer<T>
> StreamConsumer<T>      -> consumer.async()   -> AsyncStreamConsumer<T>
> QueueConsumer<T>       -> consumer.async()   -> AsyncQueueConsumer<T>
> CheckpointConsumer<T>  -> consumer.async()   -> AsyncCheckpointConsumer<T>
> Transaction            -> txn.async()        -> AsyncTransaction
> ```
> 
> Both views share the same underlying resources.
> 
> 
> # Detailed Design
> 
> ## Design & Implementation Details
> 
> ### Module structure
> 
> ```
> pulsar-client-api-v5/
>   org.apache.pulsar.client.api.v5
>   ├── PulsarClient, PulsarClientBuilder, PulsarClientException
>   ├── Producer, ProducerBuilder
>   ├── StreamConsumer, StreamConsumerBuilder
>   ├── QueueConsumer, QueueConsumerBuilder
>   ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
>   ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
>   ├── Transaction
>   ├── async/       (AsyncProducer, AsyncMessageBuilder, Async*Consumer,
> AsyncTransaction)
>   ├── auth/        (Authentication, AuthenticationData, CryptoKeyReader,
> ...)
>   ├── config/      (BatchingPolicy, CompressionPolicy, TlsPolicy,
> BackoffPolicy, ...)
>   ├── schema/      (Schema, SchemaInfo, SchemaType)
>   └── internal/    (PulsarClientProvider — ServiceLoader SPI)
> 
> pulsar-client-v5/
>   org.apache.pulsar.client.impl.v5
>   ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
>   ├── ScalableTopicProducer, ProducerBuilderV5
>   ├── ScalableStreamConsumer, StreamConsumerBuilderV5
>   ├── ScalableQueueConsumer, QueueConsumerBuilderV5
>   ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
>   ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
>   ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
>   ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
>   └── Async*V5 wrappers
> ```
> 
> ### Key types
> 
> **`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared
> between sync and
> async message sending:
> 
> ```java
> interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> {
>     BUILDER value(T value);
>     BUILDER key(String key);
>     BUILDER property(String name, String value);
>     BUILDER eventTime(Instant eventTime);
>     BUILDER deliverAfter(Duration delay);
>     BUILDER deliverAt(Instant timestamp);
>     BUILDER transaction(Transaction txn);
> }
> ```
> 
> `MessageBuilder<T>` extends it with `MessageId send()`.
> `AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId>
> send()`.
> 
> **`Checkpoint`** — Opaque, serializable position vector across all segments:
> 
> ```java
> interface Checkpoint {
>     byte[] toByteArray();
>     Instant creationTime();
> 
>     static Checkpoint earliest();
>     static Checkpoint latest();
>     static Checkpoint atTimestamp(Instant timestamp);
>     static Checkpoint fromByteArray(byte[] data);
> }
> ```
> 
> Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment
> IDs to positions.
> The format is forward-compatible — checkpoints saved with fewer segments
> can be applied after
> splits/merges.
> 
> **Configuration records** — Immutable records with static factories:
> 
> | Record | Purpose | Example |
> |--------|---------|---------|
> | `BatchingPolicy` | Batching config |
> `BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` |
> | `CompressionPolicy` | Compression codec |
> `CompressionPolicy.of(CompressionType.ZSTD)` |
> | `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` |
> | `BackoffPolicy` | Retry backoff |
> `BackoffPolicy.exponential(Duration.ofMillis(100), Duration.ofSeconds(30))`
> |
> | `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` |
> | `EncryptionPolicy` | E2E encryption |
> `EncryptionPolicy.forProducer(keyReader, "mykey")` |
> | `ChunkingPolicy` | Large msg chunking |
> `ChunkingPolicy.of(MemorySize.ofMB(10))` |
> 
> ### SPI discovery
> 
> Implementation is loaded via `java.util.ServiceLoader`:
> 
> ```java
> // In pulsar-client-api-v5
> public interface PulsarClientProvider {
>     PulsarClientBuilder newClientBuilder();
>     <T> Schema<T> jsonSchema(Class<T> clazz);
>     // ... factory methods for all SPI types
> }
> 
> // In pulsar-client-v5
> //
> META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
> // -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
> ```
> 
> This replaces the reflection-based `DefaultImplementation` approach used in
> the current API.
> 
> ## Public-facing Changes
> 
> ### Public API
> 
> This PIP introduces a new public Java API. The existing `pulsar-client-api`
> is unchanged.
> 
> **New modules:**
> - `pulsar-client-api-v5` — interfaces and value types (compile dependency
> for applications)
> - `pulsar-client-v5` — implementation (runtime dependency)
> 
> **New interfaces (summary):**
> 
> | Interface | Methods | Description |
> |-----------|---------|-------------|
> | `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`,
> `newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`,
> `close()` | Entry point |
> | `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send
> messages |
> | `StreamConsumer<T>` | `receive()`, `receive(Duration)`,
> `acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption |
> | `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`,
> `negativeAcknowledge()`, `close()`, `async()` | Parallel consumption |
> | `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`,
> `checkpoint()`, `seek()`, `close()`, `async()` | Framework consumption |
> 
> ### Configuration
> 
> No new broker configuration is introduced by this PIP. The V5 client reuses
> the existing
> `ClientConfigurationData` internally.
> 
> ### CLI
> 
> No new CLI commands specific to the V5 API.
> 
> ### Metrics
> 
> No new metrics are introduced by the V5 client API itself. The underlying
> v4 producers and
> consumers continue to emit their existing metrics.
> 
> 
> # Monitoring
> 
> The V5 client wraps v4 producers and consumers internally, so existing
> producer/consumer
> metrics (publish rate, latency, backlog, etc.) continue to work. Each
> internal segment
> producer/consumer appears as a separate instance in metrics, identified by
> the segment topic
> name.
> 
> Operators should monitor:
> - Per-segment publish rates to detect hot segments (candidates for
> splitting)
> - DAG watch session reconnections (indicates broker restarts or network
> issues)
> - Segment producer creation/closure events in client logs during
> split/merge operations
> 
> 
> # Security Considerations
> 
> The V5 client API does not introduce new security mechanisms. It delegates
> all authentication
> and authorization to the underlying v4 client:
> 
> - Authentication is configured via `PulsarClientBuilder.authentication()`
> and delegated to the
>   v4 `AuthenticationProvider` framework via `AuthenticationAdapter`
> - Topic-level authorization applies to the parent `topic://` name —
> accessing the underlying
>   `segment://` topics uses the same tenant/namespace permissions
> - End-to-end encryption is supported via `EncryptionPolicy` on
> `ProducerBuilder`, delegated to
>   the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter`
> - The new `CommandScalableTopicLookup` protocol command is sent only after
> the connection is
>   authenticated and in the `Connected` state, consistent with other lookup
> commands
> 
> No new REST endpoints are introduced by the client API itself.
> 
> 
> # Backward & Forward Compatibility
> 
> ## Upgrade
> 
> The V5 API is a new, additive module. No changes are required to existing
> applications.
> This follows the same approach as the 2.0 API redesign, where the old API
> was preserved in a
> separate compatibility module — except this time there is no breaking
> change at all. The
> existing API is unchanged and existing applications require no
> modifications.
> 
> - Applications using `pulsar-client-api` (v4) continue to work without
> modification
> - New applications can adopt the V5 API by depending on `pulsar-client-v5`
> - The V5 API works with all topic types: scalable topics, partitioned
> topics, and
>   non-partitioned topics — applications can migrate to the new API without
> changing their
>   topic infrastructure
> - Both APIs can coexist in the same JVM — the V5 implementation wraps the
> v4 transport
>   internally
> - A detailed migration path from v4 to v5 will be provided in a subsequent
> PIP
> - A seamless migration path to convert existing partitioned and
> non-partitioned topics to
>   scalable topics will also be provided, allowing applications to
> transition their topic
>   infrastructure without data loss or downtime
> 
> To adopt the V5 API, applications add `pulsar-client-v5` as a dependency
> and use
> `PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` package.
> 
> ## Downgrade / Rollback
> 
> Since the V5 API is a separate module, rollback is simply removing the
> dependency and reverting
> to v4 API calls. No broker-side changes are required.
> 
> Applications using `CheckpointConsumer` should note that saved `Checkpoint`
> byte arrays are
> specific to the V5 implementation and cannot be used with v4
> `Reader`/`Consumer`.
> 
> ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
> 
> The V5 client API itself does not introduce geo-replication concerns — it
> connects to whichever
> cluster it is configured for. However, geo-replication of scalable topics
> has specific
> considerations (segment layout synchronization across clusters,
> cross-cluster split/merge
> coordination) that will be detailed in a subsequent PIP.
> 
> 
> # Alternatives
> 
> ## Extend the existing Consumer interface
> 
> We considered adding scalable topic support to the existing `Consumer`
> interface by adding new
> methods for checkpoint/seek and hiding segment details internally. This was
> rejected because:
> 
> - The existing interface already has 60+ methods and is difficult to evolve
> - Adding checkpoint semantics alongside ack semantics would further confuse
> the API
> - The type system cannot prevent misuse (e.g., calling
> `acknowledgeCumulative()` on a Shared
>   subscription)
> - Removing partition-related methods (`TopicMessageId`, `MessageRouter`)
> would break backward
>   compatibility
> 
> ## Builder-per-subscription-type on existing API
> 
> We considered keeping a single `Consumer` type but using different builder
> types per subscription
> mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted
> methods). This was
> rejected because the returned `Consumer` would still expose all methods at
> the type level — the
> restriction would only be in documentation, not enforced by the compiler.
> 
> ## Separate module vs extending existing module
> 
> We chose a separate module (`pulsar-client-api-v5`) rather than adding new
> interfaces to
> `pulsar-client-api` because:
> 
> - The v5 API uses different naming conventions (`value()` vs `getValue()`),
> different types
>   (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` vs
> nullable)
> - Having both conventions in the same package would be confusing
> - A clean module boundary makes it clear which API generation an
> application is using
> - The v4 API can eventually be deprecated without affecting v5 users
> 
> 
> # General Notes
> 
> The V5 API targets Java 17, the same as the rest of Pulsar.
> 
> The implementation module (`pulsar-client-v5`) wraps the existing v4
> `pulsar-client` for all
> transport-level operations. This means bug fixes and performance
> improvements to the v4 client
> automatically benefit V5 users, and the V5 module itself is relatively thin
> — primarily routing
> logic and API adaptation.
> 
> 
> --
> Matteo Merli
> <[email protected]>
> 

Reply via email to