Closing the vote with:

3 binding +1s:
 * Matteo
 * Lari
 * Xiangying

1 non-binding +1:
 * Tao

Thank you,
Matteo

--
Matteo Merli
<[email protected]>

On Tue, Apr 7, 2026 at 7:53 PM xiangying meng <[email protected]> wrote:
>
> +1 (binding)
>
> Thanks,
> xiangying
>
> On Tue, Apr 7, 2026 at 7:52 PM Lari Hotari <[email protected]> wrote:
> >
> > +1 (binding)
> >
> > -Lari
> >
> > On 2026/04/06 16:17:01 Matteo Merli wrote:
> > > Matteo Merli <[email protected]>
> > > Wed, Apr 1, 9:45 AM (5 days ago)
> > > to Dev
> > > https://github.com/apache/pulsar/pull/25455
> > >
> > > # PIP-466: New Java Client API (V5) with Scalable Topic Support
> > >
> > > # Background Knowledge
> > >
> > > Apache Pulsar's Java client API (`pulsar-client-api`) has been the primary
> > > interface for Java
> > > applications since Pulsar's inception. The API surface has grown
> > > organically over the years to
> > > support partitioned topics, multiple subscription types, transactions,
> > > schema evolution, and more.
> > >
> > > **API versioning precedent.** Pulsar already went through a breaking API
> > > redesign in the 2.0
> > > release, where the old API was moved into a separate compatibility module
> > > (`pulsar-client-1x-base`) and the main module was replaced with the new
> > > API. This PIP takes
> > > a less disruptive approach: the existing `pulsar-client-api` and
> > > `pulsar-client` modules stay
> > > completely unchanged, and the new API is introduced in an additional
> > > `pulsar-client-v5` module.
> > > Existing applications are unaffected; new applications can opt in to the 
> > > V5
> > > API.
> > >
> > > **Partitioned topics** are Pulsar's current mechanism for topic-level
> > > parallelism. A partitioned
> > > topic is a collection of N independent internal topics (partitions), each
> > > backed by a separate
> > > managed ledger. The client is responsible for routing messages to
> > > partitions (via `MessageRouter`)
> > > and is exposed to partition-level details through `TopicMessageId`,
> > > `getPartitionsForTopic()`,
> > > and partition-specific consumers.
> > >
> > > **Subscription types** control how messages are distributed to consumers.
> > > Pulsar supports four
> > > types — Exclusive, Failover, Shared, and Key_Shared — all accessed through
> > > a single `Consumer`
> > > interface. The interface exposes all operations regardless of which
> > > subscription type is in use,
> > > even though some operations (e.g., `acknowledgeCumulative()` on Shared)
> > > throw at runtime.
> > >
> > > **Scalable topics** ([PIP-460](
> > > https://github.com/apache/pulsar/blob/master/pip/pip-460.md)) are a new
> > > server-side mechanism where a topic is composed of a DAG of hash-range
> > > segments that can be
> > > dynamically split and merged by the broker. Unlike partitioned topics, the
> > > number of segments
> > > is invisible to the client and can change at runtime without application
> > > awareness. The client
> > > receives segment layout updates via a dedicated protocol command (DAG 
> > > watch
> > > session) and routes
> > > messages based on hash-range matching. This PIP defines the client API
> > > designed to work with
> > > scalable topics; the broker-side design is covered by PIP-460.
> > >
> > >
> > > # Motivation
> > >
> > > ## 1. Remove partitioning from the client API
> > >
> > > The current API leaks partitioning everywhere: `TopicMessageId` carries
> > > partition indexes,
> > > `getPartitionsForTopic()` exposes the count, `MessageRouter` forces the
> > > application to make
> > > routing decisions, and consumers can be bound to specific partitions. This
> > > forces application
> > > code to deal with what is fundamentally a server-side scalability concern.
> > >
> > > With scalable topics, parallelism is achieved via hash-range segments
> > > managed entirely by the
> > > broker. The client should treat topics as opaque endpoints — per-key
> > > ordering is guaranteed
> > > when a key is specified, but the underlying parallelism (how many 
> > > segments,
> > > which broker owns
> > > each) is invisible and dynamic.
> > >
> > > The current API cannot cleanly support this model because partitioning is
> > > baked into the type
> > > system (`TopicMessageId`), the builder API (`MessageRouter`,
> > > `MessageRoutingMode`), and the
> > > consumer model (partition-specific consumers, `getPartitionsForTopic()`).
> > >
> > > ## 2. Simplify an oversized API
> > >
> > > After years of organic growth, the API surface has accumulated significant
> > > baggage:
> > >
> > > - `Consumer` has 60+ methods mixing unrelated concerns (ack, nack, seek,
> > > pause, unsubscribe,
> > >   stats, get topic name, is connected, etc.)
> > > - `ConsumerBuilder` has 40+ configuration methods with overlapping 
> > > semantics
> > > - Timeouts use `(long, TimeUnit)` in some places and `long` millis in 
> > > others
> > > - Nullable returns vs empty — inconsistent across the API
> > > - `loadConf(Map)`, `clone()`, `Serializable` on builders — rarely used,
> > > clutters the API
> > > - SPI via reflection hack (`DefaultImplementation`) instead of standard
> > > `ServiceLoader`
> > >
> > > A new module can start with a clean, minimal surface using modern Java
> > > idioms.
> > >
> > > ## 3. Separate streaming vs queuing consumption
> > >
> > > The current `Consumer` mixes all four subscription types behind a single
> > > interface:
> > >
> > > - `acknowledgeCumulative()` is available but throws at runtime for Shared
> > > subscriptions
> > > - `negativeAcknowledge()` semantics differ between modes
> > > - `seek()` behavior varies depending on subscription type
> > > - Dead-letter policy only applies to Shared/Key_Shared
> > >
> > > This design means the compiler cannot help you — you discover misuse at
> > > runtime. Splitting into
> > > purpose-built consumer types where each exposes only the operations that
> > > make sense for its model
> > > improves both usability and correctness.
> > >
> > > ## 4. Native support for connector frameworks
> > >
> > > Connector frameworks like Apache Flink and Apache Spark need to manage
> > > their own offsets across
> > > all segments of a topic, take atomic snapshots, and seek back to a
> > > checkpoint on recovery. The
> > > current API has no first-class support for this — connectors resort to
> > > low-level `Reader` plus
> > > manual partition tracking plus brittle offset management.
> > >
> > > A dedicated `CheckpointConsumer` with opaque, serializable `Checkpoint`
> > > objects provides a clean
> > > integration point.
> > >
> > >
> > > ## Relationship to PIP-460 and long-term vision
> > >
> > > This PIP is a companion to [PIP-460: Scalable Topics](
> > > https://github.com/apache/pulsar/blob/master/pip/pip-460.md),
> > > which defines the broker-side segment management, metadata storage, and
> > > admin APIs. The V5
> > > client API is the primary interface for applications to use scalable 
> > > topics
> > > — while the
> > > protocol commands and segment routing could theoretically be added to the
> > > v4 client, the V5
> > > API was designed from the ground up to support the opaque,
> > > dynamically-segmented topic model
> > > that scalable topics provide.
> > >
> > > The V5 API is designed to support all use cases currently supported by the
> > > existing API:
> > > producing messages, consuming with ordered/shared/key-shared semantics,
> > > transactions, schema
> > > evolution, and end-to-end encryption. It is not a subset — it is a full
> > > replacement API. It
> > > also works with existing partitioned and non-partitioned topics, so
> > > applications can adopt the
> > > new API without changing their topic infrastructure.
> > >
> > > The long-term vision is for scalable topics and the V5 API to become the
> > > primary model,
> > > eventually deprecating partitioned/non-partitioned topics and the v4 API.
> > > However, this
> > > deprecation is explicitly **not** planned for the 5.0 release. The 5.0
> > > release will ship both
> > > APIs side by side, with the V5 API recommended for new applications. A
> > > subsequent PIP will
> > > detail the migration path and deprecation timeline.
> > >
> > > While this PIP covers the Java client, the same API model (purpose-built
> > > consumer types, opaque
> > > topics, checkpoint-based connector support) will also be introduced in
> > > non-Java client SDKs
> > > (Python, Go, C++, Node.js) with language-appropriate idioms. Each SDK will
> > > mirror the same
> > > concepts and follow the same approach of supporting both old and new topic
> > > types side by side.
> > > The non-Java SDKs will be covered by separate PIPs.
> > >
> > >
> > > # Goals
> > >
> > > ## In Scope
> > >
> > > - A new `pulsar-client-api-v5` module with new Java interfaces for
> > > Producer, StreamConsumer,
> > >   QueueConsumer, CheckpointConsumer, and PulsarClient
> > > - A new `pulsar-client-v5` implementation module that wraps the existing 
> > > v4
> > > client transport
> > >   and adds scalable topic routing
> > > - Support for all use cases currently supported by the existing API
> > > (produce, consume with
> > >   ordered/shared/key-shared semantics, transactions, schema, encryption)
> > > - Purpose-built consumer types that separate streaming (ordered, 
> > > cumulative
> > > ack) from queuing
> > >   (parallel, individual ack) from checkpoint (unmanaged, framework-driven)
> > > - Opaque topic model where partition/segment details are hidden from the
> > > application
> > > - Modern Java API conventions: `Duration`, `Instant`, `Optional`, records,
> > > `ServiceLoader`
> > > - First-class transaction support in the main package
> > > - DAG watch protocol integration for live segment layout updates
> > >
> > > ## Out of Scope
> > >
> > > - Changes to the existing `pulsar-client-api` — it remains fully supported
> > > and unchanged
> > > - Changes to the wire protocol beyond what is needed for scalable topic 
> > > DAG
> > > watch
> > > - Broker-side scalable topic management (split/merge algorithms, load
> > > balancing) — covered by
> > >   [PIP-460](https://github.com/apache/pulsar/blob/master/pip/pip-460.md)
> > > and subsequent more
> > >   specific PIPs
> > > - Migration path from v4 to v5 API — will be detailed in a subsequent PIP
> > > - Implementation details — this PIP focuses on the public API surface
> > > - Deprecation of the existing API or partitioned/non-partitioned topic 
> > > types
> > > - TableView equivalent in v5 — may be added in a follow-up PIP
> > >
> > >
> > > # High Level Design
> > >
> > > The V5 client API is shipped as two new modules alongside the existing
> > > client:
> > >
> > > ```
> > > pulsar-client-api-v5    (interfaces and value types only — no
> > > implementation)
> > > pulsar-client-v5        (implementation, depends on pulsar-client for
> > > transport)
> > > ```
> > >
> > > The existing `pulsar-client-api` and `pulsar-client` modules are 
> > > unchanged.
> > > Applications
> > > can use v4 and v5 in the same JVM.
> > >
> > > ## Entry point
> > >
> > > ```java
> > > PulsarClient client = PulsarClient.builder()
> > >         .serviceUrl("pulsar://localhost:6650")
> > >         .build();
> > > ```
> > >
> > > The `PulsarClient` interface provides builder methods for all
> > > producer/consumer types:
> > >
> > > ```
> > > PulsarClient
> > >   .newProducer(schema)            -> ProducerBuilder    -> Producer<T>
> > >   .newStreamConsumer(schema)      -> StreamConsumerBuilder ->
> > > StreamConsumer<T>
> > >   .newQueueConsumer(schema)       -> QueueConsumerBuilder  ->
> > > QueueConsumer<T>
> > >   .newCheckpointConsumer(schema)  -> CheckpointConsumerBuilder ->
> > > CheckpointConsumer<T>
> > >   .newTransaction()               -> Transaction
> > > ```
> > >
> > > ## Consumer types
> > >
> > > Instead of a single `Consumer` with a `SubscriptionType` enum, the V5 API
> > > provides three
> > > distinct consumer types:
> > >
> > > ```mermaid
> > > graph TD
> > >     A[PulsarClient] -->|newStreamConsumer| B[StreamConsumer]
> > >     A -->|newQueueConsumer| C[QueueConsumer]
> > >     A -->|newCheckpointConsumer| D[CheckpointConsumer]
> > >
> > >     B -->|"Ordered, cumulative ack"| E[Event sourcing, CDC, ordered
> > > pipelines]
> > >     C -->|"Parallel, individual ack"| F[Work queues, task processing]
> > >     D -->|"Unmanaged, checkpoint/seek"| G[Flink, Spark connectors]
> > > ```
> > >
> > > **StreamConsumer** — Ordered consumption with cumulative acknowledgment.
> > > Maps to
> > > Exclusive/Failover subscription semantics. Messages are delivered in order
> > > (per-key when keyed).
> > >
> > > **QueueConsumer** — Unordered parallel consumption with individual
> > > acknowledgment. Maps to
> > > Shared/Key_Shared subscription semantics. Includes dead-letter policy, ack
> > > timeout, and
> > > redelivery backoff.
> > >
> > > **CheckpointConsumer** — Unmanaged consumption for connector frameworks. 
> > > No
> > > subscription, no
> > > ack — position tracking is entirely external. Provides `checkpoint()` for
> > > atomic position
> > > snapshots and `seek(Checkpoint)` for recovery.
> > >
> > > ## Scalable topic integration
> > >
> > > When a V5 client connects to a `topic://` domain topic, it establishes a
> > > DAG watch session
> > > with the broker. The broker sends the current segment layout (which
> > > segments exist, their
> > > hash ranges, and which broker owns each) and pushes updates when the 
> > > layout
> > > changes (splits,
> > > merges).
> > >
> > > ```mermaid
> > > sequenceDiagram
> > >     participant Client as V5 Client
> > >     participant Broker as Broker
> > >     participant Meta as Metadata Store
> > >
> > >     Client->>Broker: ScalableTopicLookup(topic)
> > >     Broker->>Meta: Watch topic DAG
> > >     Broker-->>Client: ScalableTopicUpdate(DAG)
> > >     Note over Client: Create per-segment producers/consumers
> > >
> > >     Meta-->>Broker: Segment split notification
> > >     Broker-->>Client: ScalableTopicUpdate(new DAG)
> > >     Note over Client: Add new segments, drain old
> > > ```
> > >
> > > The `Producer` hashes message keys to determine which segment to send to,
> > > maintaining one
> > > internal producer per active segment. When segments split or merge, the
> > > client transparently
> > > creates new internal producers and drains old ones.
> > >
> > > ## Sync/async model
> > >
> > > All types are sync-first with an `.async()` accessor:
> > >
> > > ```java
> > > Producer<T>            -> producer.async()   -> AsyncProducer<T>
> > > StreamConsumer<T>      -> consumer.async()   -> AsyncStreamConsumer<T>
> > > QueueConsumer<T>       -> consumer.async()   -> AsyncQueueConsumer<T>
> > > CheckpointConsumer<T>  -> consumer.async()   -> AsyncCheckpointConsumer<T>
> > > Transaction            -> txn.async()        -> AsyncTransaction
> > > ```
> > >
> > > Both views share the same underlying resources.
> > >
> > >
> > > # Detailed Design
> > >
> > > ## Design & Implementation Details
> > >
> > > ### Module structure
> > >
> > > ```
> > > pulsar-client-api-v5/
> > >   org.apache.pulsar.client.api.v5
> > >   ├── PulsarClient, PulsarClientBuilder, PulsarClientException
> > >   ├── Producer, ProducerBuilder
> > >   ├── StreamConsumer, StreamConsumerBuilder
> > >   ├── QueueConsumer, QueueConsumerBuilder
> > >   ├── CheckpointConsumer, CheckpointConsumerBuilder, Checkpoint
> > >   ├── Message, Messages, MessageId, MessageMetadata, MessageBuilder
> > >   ├── Transaction
> > >   ├── async/       (AsyncProducer, AsyncMessageBuilder, Async*Consumer,
> > > AsyncTransaction)
> > >   ├── auth/        (Authentication, AuthenticationData, CryptoKeyReader,
> > > ...)
> > >   ├── config/      (BatchingPolicy, CompressionPolicy, TlsPolicy,
> > > BackoffPolicy, ...)
> > >   ├── schema/      (Schema, SchemaInfo, SchemaType)
> > >   └── internal/    (PulsarClientProvider — ServiceLoader SPI)
> > >
> > > pulsar-client-v5/
> > >   org.apache.pulsar.client.impl.v5
> > >   ├── PulsarClientV5, PulsarClientBuilderV5, PulsarClientProviderV5
> > >   ├── ScalableTopicProducer, ProducerBuilderV5
> > >   ├── ScalableStreamConsumer, StreamConsumerBuilderV5
> > >   ├── ScalableQueueConsumer, QueueConsumerBuilderV5
> > >   ├── ScalableCheckpointConsumer, CheckpointConsumerBuilderV5
> > >   ├── DagWatchClient, ClientSegmentLayout, SegmentRouter
> > >   ├── SchemaAdapter, AuthenticationAdapter, CryptoKeyReaderAdapter
> > >   ├── MessageV5, MessageIdV5, MessagesV5, CheckpointV5
> > >   └── Async*V5 wrappers
> > > ```
> > >
> > > ### Key types
> > >
> > > **`MessageMetadata<T, BUILDER>`** — A self-referential builder base shared
> > > between sync and
> > > async message sending:
> > >
> > > ```java
> > > interface MessageMetadata<T, BUILDER extends MessageMetadata<T, BUILDER>> 
> > > {
> > >     BUILDER value(T value);
> > >     BUILDER key(String key);
> > >     BUILDER property(String name, String value);
> > >     BUILDER eventTime(Instant eventTime);
> > >     BUILDER deliverAfter(Duration delay);
> > >     BUILDER deliverAt(Instant timestamp);
> > >     BUILDER transaction(Transaction txn);
> > > }
> > > ```
> > >
> > > `MessageBuilder<T>` extends it with `MessageId send()`.
> > > `AsyncMessageBuilder<T>` extends it with `CompletableFuture<MessageId>
> > > send()`.
> > >
> > > **`Checkpoint`** — Opaque, serializable position vector across all 
> > > segments:
> > >
> > > ```java
> > > interface Checkpoint {
> > >     byte[] toByteArray();
> > >     Instant creationTime();
> > >
> > >     static Checkpoint earliest();
> > >     static Checkpoint latest();
> > >     static Checkpoint atTimestamp(Instant timestamp);
> > >     static Checkpoint fromByteArray(byte[] data);
> > > }
> > > ```
> > >
> > > Internally, a `Checkpoint` stores a `Map<Long, MessageId>` mapping segment
> > > IDs to positions.
> > > The format is forward-compatible — checkpoints saved with fewer segments
> > > can be applied after
> > > splits/merges.
> > >
> > > **Configuration records** — Immutable records with static factories:
> > >
> > > | Record | Purpose | Example |
> > > |--------|---------|---------|
> > > | `BatchingPolicy` | Batching config |
> > > `BatchingPolicy.of(Duration.ofMillis(10), 5000, MemorySize.ofMB(1))` |
> > > | `CompressionPolicy` | Compression codec |
> > > `CompressionPolicy.of(CompressionType.ZSTD)` |
> > > | `TlsPolicy` | TLS/mTLS config | `TlsPolicy.of("/path/to/ca.pem")` |
> > > | `BackoffPolicy` | Retry backoff |
> > > `BackoffPolicy.exponential(Duration.ofMillis(100), 
> > > Duration.ofSeconds(30))`
> > > |
> > > | `DeadLetterPolicy` | Dead letter queue | `DeadLetterPolicy.of(5)` |
> > > | `EncryptionPolicy` | E2E encryption |
> > > `EncryptionPolicy.forProducer(keyReader, "mykey")` |
> > > | `ChunkingPolicy` | Large msg chunking |
> > > `ChunkingPolicy.of(MemorySize.ofMB(10))` |
> > >
> > > ### SPI discovery
> > >
> > > Implementation is loaded via `java.util.ServiceLoader`:
> > >
> > > ```java
> > > // In pulsar-client-api-v5
> > > public interface PulsarClientProvider {
> > >     PulsarClientBuilder newClientBuilder();
> > >     <T> Schema<T> jsonSchema(Class<T> clazz);
> > >     // ... factory methods for all SPI types
> > > }
> > >
> > > // In pulsar-client-v5
> > > //
> > > META-INF/services/org.apache.pulsar.client.api.v5.internal.PulsarClientProvider
> > > // -> org.apache.pulsar.client.impl.v5.PulsarClientProviderV5
> > > ```
> > >
> > > This replaces the reflection-based `DefaultImplementation` approach used 
> > > in
> > > the current API.
> > >
> > > ## Public-facing Changes
> > >
> > > ### Public API
> > >
> > > This PIP introduces a new public Java API. The existing 
> > > `pulsar-client-api`
> > > is unchanged.
> > >
> > > **New modules:**
> > > - `pulsar-client-api-v5` — interfaces and value types (compile dependency
> > > for applications)
> > > - `pulsar-client-v5` — implementation (runtime dependency)
> > >
> > > **New interfaces (summary):**
> > >
> > > | Interface | Methods | Description |
> > > |-----------|---------|-------------|
> > > | `PulsarClient` | `builder()`, `newProducer()`, `newStreamConsumer()`,
> > > `newQueueConsumer()`, `newCheckpointConsumer()`, `newTransaction()`,
> > > `close()` | Entry point |
> > > | `Producer<T>` | `newMessage()`, `flush()`, `close()`, `async()` | Send
> > > messages |
> > > | `StreamConsumer<T>` | `receive()`, `receive(Duration)`,
> > > `acknowledgeCumulative()`, `close()`, `async()` | Ordered consumption |
> > > | `QueueConsumer<T>` | `receive()`, `receive(Duration)`, `acknowledge()`,
> > > `negativeAcknowledge()`, `close()`, `async()` | Parallel consumption |
> > > | `CheckpointConsumer<T>` | `receive()`, `receive(Duration)`,
> > > `checkpoint()`, `seek()`, `close()`, `async()` | Framework consumption |
> > >
> > > ### Configuration
> > >
> > > No new broker configuration is introduced by this PIP. The V5 client 
> > > reuses
> > > the existing
> > > `ClientConfigurationData` internally.
> > >
> > > ### CLI
> > >
> > > No new CLI commands specific to the V5 API.
> > >
> > > ### Metrics
> > >
> > > No new metrics are introduced by the V5 client API itself. The underlying
> > > v4 producers and
> > > consumers continue to emit their existing metrics.
> > >
> > >
> > > # Monitoring
> > >
> > > The V5 client wraps v4 producers and consumers internally, so existing
> > > producer/consumer
> > > metrics (publish rate, latency, backlog, etc.) continue to work. Each
> > > internal segment
> > > producer/consumer appears as a separate instance in metrics, identified by
> > > the segment topic
> > > name.
> > >
> > > Operators should monitor:
> > > - Per-segment publish rates to detect hot segments (candidates for
> > > splitting)
> > > - DAG watch session reconnections (indicates broker restarts or network
> > > issues)
> > > - Segment producer creation/closure events in client logs during
> > > split/merge operations
> > >
> > >
> > > # Security Considerations
> > >
> > > The V5 client API does not introduce new security mechanisms. It delegates
> > > all authentication
> > > and authorization to the underlying v4 client:
> > >
> > > - Authentication is configured via `PulsarClientBuilder.authentication()`
> > > and delegated to the
> > >   v4 `AuthenticationProvider` framework via `AuthenticationAdapter`
> > > - Topic-level authorization applies to the parent `topic://` name —
> > > accessing the underlying
> > >   `segment://` topics uses the same tenant/namespace permissions
> > > - End-to-end encryption is supported via `EncryptionPolicy` on
> > > `ProducerBuilder`, delegated to
> > >   the v4 `CryptoKeyReader` framework via `CryptoKeyReaderAdapter`
> > > - The new `CommandScalableTopicLookup` protocol command is sent only after
> > > the connection is
> > >   authenticated and in the `Connected` state, consistent with other lookup
> > > commands
> > >
> > > No new REST endpoints are introduced by the client API itself.
> > >
> > >
> > > # Backward & Forward Compatibility
> > >
> > > ## Upgrade
> > >
> > > The V5 API is a new, additive module. No changes are required to existing
> > > applications.
> > > This follows the same approach as the 2.0 API redesign, where the old API
> > > was preserved in a
> > > separate compatibility module — except this time there is no breaking
> > > change at all. The
> > > existing API is unchanged and existing applications require no
> > > modifications.
> > >
> > > - Applications using `pulsar-client-api` (v4) continue to work without
> > > modification
> > > - New applications can adopt the V5 API by depending on `pulsar-client-v5`
> > > - The V5 API works with all topic types: scalable topics, partitioned
> > > topics, and
> > >   non-partitioned topics — applications can migrate to the new API without
> > > changing their
> > >   topic infrastructure
> > > - Both APIs can coexist in the same JVM — the V5 implementation wraps the
> > > v4 transport
> > >   internally
> > > - A detailed migration path from v4 to v5 will be provided in a subsequent
> > > PIP
> > > - A seamless migration path to convert existing partitioned and
> > > non-partitioned topics to
> > >   scalable topics will also be provided, allowing applications to
> > > transition their topic
> > >   infrastructure without data loss or downtime
> > >
> > > To adopt the V5 API, applications add `pulsar-client-v5` as a dependency
> > > and use
> > > `PulsarClient.builder()` from the `org.apache.pulsar.client.api.v5` 
> > > package.
> > >
> > > ## Downgrade / Rollback
> > >
> > > Since the V5 API is a separate module, rollback is simply removing the
> > > dependency and reverting
> > > to v4 API calls. No broker-side changes are required.
> > >
> > > Applications using `CheckpointConsumer` should note that saved 
> > > `Checkpoint`
> > > byte arrays are
> > > specific to the V5 implementation and cannot be used with v4
> > > `Reader`/`Consumer`.
> > >
> > > ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
> > >
> > > The V5 client API itself does not introduce geo-replication concerns — it
> > > connects to whichever
> > > cluster it is configured for. However, geo-replication of scalable topics
> > > has specific
> > > considerations (segment layout synchronization across clusters,
> > > cross-cluster split/merge
> > > coordination) that will be detailed in a subsequent PIP.
> > >
> > >
> > > # Alternatives
> > >
> > > ## Extend the existing Consumer interface
> > >
> > > We considered adding scalable topic support to the existing `Consumer`
> > > interface by adding new
> > > methods for checkpoint/seek and hiding segment details internally. This 
> > > was
> > > rejected because:
> > >
> > > - The existing interface already has 60+ methods and is difficult to 
> > > evolve
> > > - Adding checkpoint semantics alongside ack semantics would further 
> > > confuse
> > > the API
> > > - The type system cannot prevent misuse (e.g., calling
> > > `acknowledgeCumulative()` on a Shared
> > >   subscription)
> > > - Removing partition-related methods (`TopicMessageId`, `MessageRouter`)
> > > would break backward
> > >   compatibility
> > >
> > > ## Builder-per-subscription-type on existing API
> > >
> > > We considered keeping a single `Consumer` type but using different builder
> > > types per subscription
> > > mode (e.g., `StreamConsumerBuilder` returning a `Consumer` with restricted
> > > methods). This was
> > > rejected because the returned `Consumer` would still expose all methods at
> > > the type level — the
> > > restriction would only be in documentation, not enforced by the compiler.
> > >
> > > ## Separate module vs extending existing module
> > >
> > > We chose a separate module (`pulsar-client-api-v5`) rather than adding new
> > > interfaces to
> > > `pulsar-client-api` because:
> > >
> > > - The v5 API uses different naming conventions (`value()` vs 
> > > `getValue()`),
> > > different types
> > >   (`Duration` vs `(long, TimeUnit)`), and different patterns (`Optional` 
> > > vs
> > > nullable)
> > > - Having both conventions in the same package would be confusing
> > > - A clean module boundary makes it clear which API generation an
> > > application is using
> > > - The v4 API can eventually be deprecated without affecting v5 users
> > >
> > >
> > > # General Notes
> > >
> > > The V5 API targets Java 17, the same as the rest of Pulsar.
> > >
> > > The implementation module (`pulsar-client-v5`) wraps the existing v4
> > > `pulsar-client` for all
> > > transport-level operations. This means bug fixes and performance
> > > improvements to the v4 client
> > > automatically benefit V5 users, and the V5 module itself is relatively 
> > > thin
> > > — primarily routing
> > > logic and API adaptation.
> > >
> > >
> > > --
> > > Matteo Merli
> > > <[email protected]>
> > >

Reply via email to