Sounds good! Pulsar definitely needs such a simplification.
On 2026/02/27 02:07:10 Matteo Merli wrote: > Hi everyone, > > I'd like to start a discussion around a potential redesign of the Java > client API, shipped as a new separate module (pulsar-client-api-v5), > starting from next LTS version 5.0. This is not a fully fledged PIP > yet — I want to gather feedback and gauge interest before formalizing > a proposal. > > The draft API (interfaces only, no implementation) is available here: > > API: > https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5 > > Usage examples: > https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java > > Why a new API? > > Three main drivers: > > Remove partitioning from the client API > > The current API leaks partitioning everywhere: TopicMessageId, > getPartitionsForTopic(), MessageRouter, MessageRoutingMode, > partition-specific consumers. This forces application code to deal > with what is fundamentally a server-side scalability concern. > > In the new API, topics are opaque. Scalability is handled internally > by hash-range segments — the client never sees them. Per-key ordering > is still guaranteed when a key is specified, but the underlying > parallelism is entirely managed by the broker. > > Simplify an API that grew too big and inconsistent > > After years of organic growth, the current API surface has accumulated > a lot of baggage: > > Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek, > pause, unsubscribe, stats...) > ConsumerBuilder has 40+ configuration methods with overlapping semantics > Timeouts use (long, TimeUnit) in some places, long millis in others > Nullable returns vs empty — inconsistent across the API > loadConf(Map), clone(), Serializable on builders — rarely used, clutters the > API > SPI via reflection hack (DefaultImplementation) instead of standard > ServiceLoader > > The v5 API targets modern Java, uses > Duration/Instant/Optional/records, and keeps a minimal surface. > > Separate streaming vs queuing consumption > > The current Consumer mixes all four subscription types (Exclusive, > Failover, Shared, Key_Shared) behind a single interface. The result: > acknowledgeCumulative() is available but throws at runtime for Shared > subscriptions; negativeAcknowledge() semantics differ between modes; > seek() behavior varies; dead-letter policy only applies to some modes. > > The v5 API splits this into purpose-built types: > > StreamConsumer — ordered consumption with cumulative ack (maps to > Exclusive/Failover). For event sourcing, CDC, ordered pipelines. > QueueConsumer — unordered parallel consumption with individual ack, > nack, dead-letter support (maps to Shared/Key_Shared). For work > queues, task processing. > CheckpointConsumer — unmanaged consumption for connector frameworks > (Flink, Spark). No subscription, no ack — position is tracked > externally via an opaque Checkpoint type that captures a consistent > position across all internal hash-range segments. > > Each type only exposes the operations that make sense for its model. > No more runtime surprises. > > API overview > > Entry point: > > PulsarClient > .newProducer(schema) -> ProducerBuilder -> Producer<T> > .newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T> > .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T> > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> > CheckpointConsumer<T> > .newTransaction() -> Transaction > > Sync-first with .async() accessor — each type has an async counterpart > (e.g. producer.async() returns AsyncProducer) with > CompletableFuture-based operations. > > Configuration grouped into policy records: BatchingPolicy, > CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy, > EncryptionPolicy, etc. > > Checkpoint is an opaque serializable position vector with factories: > Checkpoint.earliest(), Checkpoint.latest(), > Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes). > Connectors can snapshot and restore positions without knowing about > internal segments. > > What this does NOT change > > Wire protocol — unchanged > Broker — unchanged > Existing pulsar-client-api — stays as-is, fully supported > This is API-only (interfaces) — the implementation module would come later > > Looking forward to your thoughts! > > -- > Matteo Merli > <[email protected]> >
