Hi everyone,

I'd like to start a discussion around a potential redesign of the Java
client API, shipped as a new separate module (pulsar-client-api-v5),
starting from next LTS version 5.0. This is not a fully fledged PIP
yet — I want to gather feedback and gauge interest before formalizing
a proposal.

The draft API (interfaces only, no implementation) is available here:

API: 
https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5

Usage examples:
https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java

Why a new API?

Three main drivers:

Remove partitioning from the client API

The current API leaks partitioning everywhere: TopicMessageId,
getPartitionsForTopic(), MessageRouter, MessageRoutingMode,
partition-specific consumers. This forces application code to deal
with what is fundamentally a server-side scalability concern.

In the new API, topics are opaque. Scalability is handled internally
by hash-range segments — the client never sees them. Per-key ordering
is still guaranteed when a key is specified, but the underlying
parallelism is entirely managed by the broker.

Simplify an API that grew too big and inconsistent

After years of organic growth, the current API surface has accumulated
a lot of baggage:

Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek,
pause, unsubscribe, stats...)
ConsumerBuilder has 40+ configuration methods with overlapping semantics
Timeouts use (long, TimeUnit) in some places, long millis in others
Nullable returns vs empty — inconsistent across the API
loadConf(Map), clone(), Serializable on builders — rarely used, clutters the API
SPI via reflection hack (DefaultImplementation) instead of standard
ServiceLoader

The v5 API targets modern Java, uses
Duration/Instant/Optional/records, and keeps a minimal surface.

Separate streaming vs queuing consumption

The current Consumer mixes all four subscription types (Exclusive,
Failover, Shared, Key_Shared) behind a single interface. The result:
acknowledgeCumulative() is available but throws at runtime for Shared
subscriptions; negativeAcknowledge() semantics differ between modes;
seek() behavior varies; dead-letter policy only applies to some modes.

The v5 API splits this into purpose-built types:

StreamConsumer — ordered consumption with cumulative ack (maps to
Exclusive/Failover). For event sourcing, CDC, ordered pipelines.
QueueConsumer — unordered parallel consumption with individual ack,
nack, dead-letter support (maps to Shared/Key_Shared). For work
queues, task processing.
CheckpointConsumer — unmanaged consumption for connector frameworks
(Flink, Spark). No subscription, no ack — position is tracked
externally via an opaque Checkpoint type that captures a consistent
position across all internal hash-range segments.

Each type only exposes the operations that make sense for its model.
No more runtime surprises.

API overview

Entry point:

PulsarClient
.newProducer(schema) -> ProducerBuilder -> Producer<T>
.newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T>
.newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T>
.newCheckpointConsumer(schema) -> CheckpointConsumerBuilder ->
CheckpointConsumer<T>
.newTransaction() -> Transaction

Sync-first with .async() accessor — each type has an async counterpart
(e.g. producer.async() returns AsyncProducer) with
CompletableFuture-based operations.

Configuration grouped into policy records: BatchingPolicy,
CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy,
EncryptionPolicy, etc.

Checkpoint is an opaque serializable position vector with factories:
Checkpoint.earliest(), Checkpoint.latest(),
Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes).
Connectors can snapshot and restore positions without knowing about
internal segments.

What this does NOT change

Wire protocol — unchanged
Broker — unchanged
Existing pulsar-client-api — stays as-is, fully supported
This is API-only (interfaces) — the implementation module would come later

Looking forward to your thoughts!

--
Matteo Merli
<[email protected]>

Reply via email to