+1

Regards,
penghui

On Thu, Feb 26, 2026 at 6:28 PM WenZhi Feng <[email protected]> wrote:

> Sounds good! Pulsar definitely needs such a simplification.
>
> On 2026/02/27 02:07:10 Matteo Merli wrote:
> > Hi everyone,
> >
> > I'd like to start a discussion around a potential redesign of the Java
> > client API, shipped as a new separate module (pulsar-client-api-v5),
> > starting from next LTS version 5.0. This is not a fully fledged PIP
> > yet — I want to gather feedback and gauge interest before formalizing
> > a proposal.
> >
> > The draft API (interfaces only, no implementation) is available here:
> >
> > API:
> https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5
> >
> > Usage examples:
> >
> https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
> >
> > Why a new API?
> >
> > Three main drivers:
> >
> > Remove partitioning from the client API
> >
> > The current API leaks partitioning everywhere: TopicMessageId,
> > getPartitionsForTopic(), MessageRouter, MessageRoutingMode,
> > partition-specific consumers. This forces application code to deal
> > with what is fundamentally a server-side scalability concern.
> >
> > In the new API, topics are opaque. Scalability is handled internally
> > by hash-range segments — the client never sees them. Per-key ordering
> > is still guaranteed when a key is specified, but the underlying
> > parallelism is entirely managed by the broker.
> >
> > Simplify an API that grew too big and inconsistent
> >
> > After years of organic growth, the current API surface has accumulated
> > a lot of baggage:
> >
> > Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek,
> > pause, unsubscribe, stats...)
> > ConsumerBuilder has 40+ configuration methods with overlapping semantics
> > Timeouts use (long, TimeUnit) in some places, long millis in others
> > Nullable returns vs empty — inconsistent across the API
> > loadConf(Map), clone(), Serializable on builders — rarely used, clutters
> the API
> > SPI via reflection hack (DefaultImplementation) instead of standard
> > ServiceLoader
> >
> > The v5 API targets modern Java, uses
> > Duration/Instant/Optional/records, and keeps a minimal surface.
> >
> > Separate streaming vs queuing consumption
> >
> > The current Consumer mixes all four subscription types (Exclusive,
> > Failover, Shared, Key_Shared) behind a single interface. The result:
> > acknowledgeCumulative() is available but throws at runtime for Shared
> > subscriptions; negativeAcknowledge() semantics differ between modes;
> > seek() behavior varies; dead-letter policy only applies to some modes.
> >
> > The v5 API splits this into purpose-built types:
> >
> > StreamConsumer — ordered consumption with cumulative ack (maps to
> > Exclusive/Failover). For event sourcing, CDC, ordered pipelines.
> > QueueConsumer — unordered parallel consumption with individual ack,
> > nack, dead-letter support (maps to Shared/Key_Shared). For work
> > queues, task processing.
> > CheckpointConsumer — unmanaged consumption for connector frameworks
> > (Flink, Spark). No subscription, no ack — position is tracked
> > externally via an opaque Checkpoint type that captures a consistent
> > position across all internal hash-range segments.
> >
> > Each type only exposes the operations that make sense for its model.
> > No more runtime surprises.
> >
> > API overview
> >
> > Entry point:
> >
> > PulsarClient
> > .newProducer(schema) -> ProducerBuilder -> Producer<T>
> > .newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T>
> > .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
> > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder ->
> > CheckpointConsumer<T>
> > .newTransaction() -> Transaction
> >
> > Sync-first with .async() accessor — each type has an async counterpart
> > (e.g. producer.async() returns AsyncProducer) with
> > CompletableFuture-based operations.
> >
> > Configuration grouped into policy records: BatchingPolicy,
> > CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy,
> > EncryptionPolicy, etc.
> >
> > Checkpoint is an opaque serializable position vector with factories:
> > Checkpoint.earliest(), Checkpoint.latest(),
> > Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes).
> > Connectors can snapshot and restore positions without knowing about
> > internal segments.
> >
> > What this does NOT change
> >
> > Wire protocol — unchanged
> > Broker — unchanged
> > Existing pulsar-client-api — stays as-is, fully supported
> > This is API-only (interfaces) — the implementation module would come
> later
> >
> > Looking forward to your thoughts!
> >
> > --
> > Matteo Merli
> > <[email protected]>
> >
>

Reply via email to