+1

Jia Zhai <[email protected]> 于2026年2月28日周六 14:53写道:

> +1
>
> Best Regards.
>
>
> Jia Zhai
>
> Beijing, China
>
> Mobile: +86 15810491983
>
>
>
>
> On Sat, Feb 28, 2026 at 9:53 AM Tao Jiuming <[email protected]> wrote:
>
> > great change!
> >
> > Matteo Merli <[email protected]>于2026年2月27日 周五10:07写道:
> >
> > > Hi everyone,
> > >
> > > I'd like to start a discussion around a potential redesign of the Java
> > > client API, shipped as a new separate module (pulsar-client-api-v5),
> > > starting from next LTS version 5.0. This is not a fully fledged PIP
> > > yet — I want to gather feedback and gauge interest before formalizing
> > > a proposal.
> > >
> > > The draft API (interfaces only, no implementation) is available here:
> > >
> > > API:
> > >
> >
> https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5
> > >
> > > Usage examples:
> > >
> > >
> >
> https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
> > >
> > > Why a new API?
> > >
> > > Three main drivers:
> > >
> > > Remove partitioning from the client API
> > >
> > > The current API leaks partitioning everywhere: TopicMessageId,
> > > getPartitionsForTopic(), MessageRouter, MessageRoutingMode,
> > > partition-specific consumers. This forces application code to deal
> > > with what is fundamentally a server-side scalability concern.
> > >
> > > In the new API, topics are opaque. Scalability is handled internally
> > > by hash-range segments — the client never sees them. Per-key ordering
> > > is still guaranteed when a key is specified, but the underlying
> > > parallelism is entirely managed by the broker.
> > >
> > > Simplify an API that grew too big and inconsistent
> > >
> > > After years of organic growth, the current API surface has accumulated
> > > a lot of baggage:
> > >
> > > Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek,
> > > pause, unsubscribe, stats...)
> > > ConsumerBuilder has 40+ configuration methods with overlapping
> semantics
> > > Timeouts use (long, TimeUnit) in some places, long millis in others
> > > Nullable returns vs empty — inconsistent across the API
> > > loadConf(Map), clone(), Serializable on builders — rarely used,
> clutters
> > > the API
> > > SPI via reflection hack (DefaultImplementation) instead of standard
> > > ServiceLoader
> > >
> > > The v5 API targets modern Java, uses
> > > Duration/Instant/Optional/records, and keeps a minimal surface.
> > >
> > > Separate streaming vs queuing consumption
> > >
> > > The current Consumer mixes all four subscription types (Exclusive,
> > > Failover, Shared, Key_Shared) behind a single interface. The result:
> > > acknowledgeCumulative() is available but throws at runtime for Shared
> > > subscriptions; negativeAcknowledge() semantics differ between modes;
> > > seek() behavior varies; dead-letter policy only applies to some modes.
> > >
> > > The v5 API splits this into purpose-built types:
> > >
> > > StreamConsumer — ordered consumption with cumulative ack (maps to
> > > Exclusive/Failover). For event sourcing, CDC, ordered pipelines.
> > > QueueConsumer — unordered parallel consumption with individual ack,
> > > nack, dead-letter support (maps to Shared/Key_Shared). For work
> > > queues, task processing.
> > > CheckpointConsumer — unmanaged consumption for connector frameworks
> > > (Flink, Spark). No subscription, no ack — position is tracked
> > > externally via an opaque Checkpoint type that captures a consistent
> > > position across all internal hash-range segments.
> > >
> > > Each type only exposes the operations that make sense for its model.
> > > No more runtime surprises.
> > >
> > > API overview
> > >
> > > Entry point:
> > >
> > > PulsarClient
> > > .newProducer(schema) -> ProducerBuilder -> Producer<T>
> > > .newStreamConsumer(schema) -> StreamConsumerBuilder ->
> StreamConsumer<T>
> > > .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
> > > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder ->
> > > CheckpointConsumer<T>
> > > .newTransaction() -> Transaction
> > >
> > > Sync-first with .async() accessor — each type has an async counterpart
> > > (e.g. producer.async() returns AsyncProducer) with
> > > CompletableFuture-based operations.
> > >
> > > Configuration grouped into policy records: BatchingPolicy,
> > > CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy,
> > > EncryptionPolicy, etc.
> > >
> > > Checkpoint is an opaque serializable position vector with factories:
> > > Checkpoint.earliest(), Checkpoint.latest(),
> > > Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes).
> > > Connectors can snapshot and restore positions without knowing about
> > > internal segments.
> > >
> > > What this does NOT change
> > >
> > > Wire protocol — unchanged
> > > Broker — unchanged
> > > Existing pulsar-client-api — stays as-is, fully supported
> > > This is API-only (interfaces) — the implementation module would come
> > later
> > >
> > > Looking forward to your thoughts!
> > >
> > > --
> > > Matteo Merli
> > > <[email protected]>
> > >
> >
>

Reply via email to