Sounds good! Pulsar definitely needs such a simplification.

On 2026/02/27 02:07:10 Matteo Merli wrote:
> Hi everyone,
> 
> I'd like to start a discussion around a potential redesign of the Java
> client API, shipped as a new separate module (pulsar-client-api-v5),
> starting from next LTS version 5.0. This is not a fully fledged PIP
> yet — I want to gather feedback and gauge interest before formalizing
> a proposal.
> 
> The draft API (interfaces only, no implementation) is available here:
> 
> API: 
> https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5
> 
> Usage examples:
> https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
> 
> Why a new API?
> 
> Three main drivers:
> 
> Remove partitioning from the client API
> 
> The current API leaks partitioning everywhere: TopicMessageId,
> getPartitionsForTopic(), MessageRouter, MessageRoutingMode,
> partition-specific consumers. This forces application code to deal
> with what is fundamentally a server-side scalability concern.
> 
> In the new API, topics are opaque. Scalability is handled internally
> by hash-range segments — the client never sees them. Per-key ordering
> is still guaranteed when a key is specified, but the underlying
> parallelism is entirely managed by the broker.
> 
> Simplify an API that grew too big and inconsistent
> 
> After years of organic growth, the current API surface has accumulated
> a lot of baggage:
> 
> Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek,
> pause, unsubscribe, stats...)
> ConsumerBuilder has 40+ configuration methods with overlapping semantics
> Timeouts use (long, TimeUnit) in some places, long millis in others
> Nullable returns vs empty — inconsistent across the API
> loadConf(Map), clone(), Serializable on builders — rarely used, clutters the 
> API
> SPI via reflection hack (DefaultImplementation) instead of standard
> ServiceLoader
> 
> The v5 API targets modern Java, uses
> Duration/Instant/Optional/records, and keeps a minimal surface.
> 
> Separate streaming vs queuing consumption
> 
> The current Consumer mixes all four subscription types (Exclusive,
> Failover, Shared, Key_Shared) behind a single interface. The result:
> acknowledgeCumulative() is available but throws at runtime for Shared
> subscriptions; negativeAcknowledge() semantics differ between modes;
> seek() behavior varies; dead-letter policy only applies to some modes.
> 
> The v5 API splits this into purpose-built types:
> 
> StreamConsumer — ordered consumption with cumulative ack (maps to
> Exclusive/Failover). For event sourcing, CDC, ordered pipelines.
> QueueConsumer — unordered parallel consumption with individual ack,
> nack, dead-letter support (maps to Shared/Key_Shared). For work
> queues, task processing.
> CheckpointConsumer — unmanaged consumption for connector frameworks
> (Flink, Spark). No subscription, no ack — position is tracked
> externally via an opaque Checkpoint type that captures a consistent
> position across all internal hash-range segments.
> 
> Each type only exposes the operations that make sense for its model.
> No more runtime surprises.
> 
> API overview
> 
> Entry point:
> 
> PulsarClient
> .newProducer(schema) -> ProducerBuilder -> Producer<T>
> .newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T>
> .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
> .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder ->
> CheckpointConsumer<T>
> .newTransaction() -> Transaction
> 
> Sync-first with .async() accessor — each type has an async counterpart
> (e.g. producer.async() returns AsyncProducer) with
> CompletableFuture-based operations.
> 
> Configuration grouped into policy records: BatchingPolicy,
> CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy,
> EncryptionPolicy, etc.
> 
> Checkpoint is an opaque serializable position vector with factories:
> Checkpoint.earliest(), Checkpoint.latest(),
> Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes).
> Connectors can snapshot and restore positions without knowing about
> internal segments.
> 
> What this does NOT change
> 
> Wire protocol — unchanged
> Broker — unchanged
> Existing pulsar-client-api — stays as-is, fully supported
> This is API-only (interfaces) — the implementation module would come later
> 
> Looking forward to your thoughts!
> 
> --
> Matteo Merli
> <[email protected]>
> 

Reply via email to