+1 On Sat, 28 Feb 2026 at 02:23, PengHui Li <[email protected]> wrote:
> +1 > > Regards, > penghui > > On Thu, Feb 26, 2026 at 6:28 PM WenZhi Feng <[email protected]> wrote: > > > Sounds good! Pulsar definitely needs such a simplification. > > > > On 2026/02/27 02:07:10 Matteo Merli wrote: > > > Hi everyone, > > > > > > I'd like to start a discussion around a potential redesign of the Java > > > client API, shipped as a new separate module (pulsar-client-api-v5), > > > starting from next LTS version 5.0. This is not a fully fledged PIP > > > yet — I want to gather feedback and gauge interest before formalizing > > > a proposal. > > > > > > The draft API (interfaces only, no implementation) is available here: > > > > > > API: > > > https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5 > > > > > > Usage examples: > > > > > > https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java > > > > > > Why a new API? > > > > > > Three main drivers: > > > > > > Remove partitioning from the client API > > > > > > The current API leaks partitioning everywhere: TopicMessageId, > > > getPartitionsForTopic(), MessageRouter, MessageRoutingMode, > > > partition-specific consumers. This forces application code to deal > > > with what is fundamentally a server-side scalability concern. > > > > > > In the new API, topics are opaque. Scalability is handled internally > > > by hash-range segments — the client never sees them. Per-key ordering > > > is still guaranteed when a key is specified, but the underlying > > > parallelism is entirely managed by the broker. > > > > > > Simplify an API that grew too big and inconsistent > > > > > > After years of organic growth, the current API surface has accumulated > > > a lot of baggage: > > > > > > Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek, > > > pause, unsubscribe, stats...) > > > ConsumerBuilder has 40+ configuration methods with overlapping > semantics > > > Timeouts use (long, TimeUnit) in some places, long millis in others > > > Nullable returns vs empty — inconsistent across the API > > > loadConf(Map), clone(), Serializable on builders — rarely used, > clutters > > the API > > > SPI via reflection hack (DefaultImplementation) instead of standard > > > ServiceLoader > > > > > > The v5 API targets modern Java, uses > > > Duration/Instant/Optional/records, and keeps a minimal surface. > > > > > > Separate streaming vs queuing consumption > > > > > > The current Consumer mixes all four subscription types (Exclusive, > > > Failover, Shared, Key_Shared) behind a single interface. The result: > > > acknowledgeCumulative() is available but throws at runtime for Shared > > > subscriptions; negativeAcknowledge() semantics differ between modes; > > > seek() behavior varies; dead-letter policy only applies to some modes. > > > > > > The v5 API splits this into purpose-built types: > > > > > > StreamConsumer — ordered consumption with cumulative ack (maps to > > > Exclusive/Failover). For event sourcing, CDC, ordered pipelines. > > > QueueConsumer — unordered parallel consumption with individual ack, > > > nack, dead-letter support (maps to Shared/Key_Shared). For work > > > queues, task processing. > > > CheckpointConsumer — unmanaged consumption for connector frameworks > > > (Flink, Spark). No subscription, no ack — position is tracked > > > externally via an opaque Checkpoint type that captures a consistent > > > position across all internal hash-range segments. > > > > > > Each type only exposes the operations that make sense for its model. > > > No more runtime surprises. > > > > > > API overview > > > > > > Entry point: > > > > > > PulsarClient > > > .newProducer(schema) -> ProducerBuilder -> Producer<T> > > > .newStreamConsumer(schema) -> StreamConsumerBuilder -> > StreamConsumer<T> > > > .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T> > > > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> > > > CheckpointConsumer<T> > > > .newTransaction() -> Transaction > > > > > > Sync-first with .async() accessor — each type has an async counterpart > > > (e.g. producer.async() returns AsyncProducer) with > > > CompletableFuture-based operations. > > > > > > Configuration grouped into policy records: BatchingPolicy, > > > CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy, > > > EncryptionPolicy, etc. > > > > > > Checkpoint is an opaque serializable position vector with factories: > > > Checkpoint.earliest(), Checkpoint.latest(), > > > Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes). > > > Connectors can snapshot and restore positions without knowing about > > > internal segments. > > > > > > What this does NOT change > > > > > > Wire protocol — unchanged > > > Broker — unchanged > > > Existing pulsar-client-api — stays as-is, fully supported > > > This is API-only (interfaces) — the implementation module would come > > later > > > > > > Looking forward to your thoughts! > > > > > > -- > > > Matteo Merli > > > <[email protected]> > > > > > >
