This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 12d5f26bec8 PIP-468: V5 policy options use builder pattern (#25657)
12d5f26bec8 is described below
commit 12d5f26bec8d5f413e459a94b835a22d80e58f15
Author: Matteo Merli <[email protected]>
AuthorDate: Sun May 3 10:24:52 2026 -0700
PIP-468: V5 policy options use builder pattern (#25657)
---
.../client/api/v5/V5DeadLetterPolicyTest.java | 17 ++-
.../client/api/v5/V5ProducerBatchingTest.java | 16 +-
.../pulsar/client/api/v5/V5TransactionTest.java | 2 +-
.../pulsar/client/api/v5/config/BackoffPolicy.java | 110 +++++++++++---
.../client/api/v5/config/BatchingPolicy.java | 146 +++++++++++++++----
.../client/api/v5/config/ChunkingPolicy.java | 100 ++++++++++++-
.../client/api/v5/config/CompressionPolicy.java | 61 +++++++-
.../client/api/v5/config/ConnectionPolicy.java | 133 ++++++++++++++---
.../client/api/v5/config/DeadLetterPolicy.java | 135 ++++++++++++++---
.../client/api/v5/config/EncryptionPolicy.java | 112 ++++++++++++--
.../api/v5/config/ProcessingTimeoutPolicy.java | 92 ++++++++++--
.../pulsar/client/api/v5/config/TlsPolicy.java | 162 +++++++++++++++++----
.../client/api/v5/config/TransactionPolicy.java | 64 +++++++-
.../org/apache/pulsar/client/api/v5/Examples.java | 11 +-
14 files changed, 999 insertions(+), 162 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
index 11f5cd0b462..dd29e618985 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -59,7 +59,10 @@ public class V5DeadLetterPolicyTest extends V5ClientBaseTest
{
.subscriptionName("dlq-sub")
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
Duration.ofMillis(200), Duration.ofMillis(200)))
- .deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic,
null))
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(2)
+ .deadLetterTopic(dlqTopic)
+ .build())
.subscribe();
@Cleanup
@@ -104,7 +107,7 @@ public class V5DeadLetterPolicyTest extends
V5ClientBaseTest {
.subscriptionName("default-dlq-sub")
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
Duration.ofMillis(200), Duration.ofMillis(200)))
- .deadLetterPolicy(DeadLetterPolicy.of(1))
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
.subscribe();
@Cleanup
@@ -150,7 +153,10 @@ public class V5DeadLetterPolicyTest extends
V5ClientBaseTest {
.subscriptionName("dlq-meta-sub")
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
Duration.ofMillis(200), Duration.ofMillis(200)))
- .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic,
null))
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(1)
+ .deadLetterTopic(dlqTopic)
+ .build())
.subscribe();
@Cleanup
QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
@@ -212,7 +218,10 @@ public class V5DeadLetterPolicyTest extends
V5ClientBaseTest {
.subscriptionName("dlq-multi-sub")
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
Duration.ofMillis(200), Duration.ofMillis(200)))
- .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic,
null))
+ .deadLetterPolicy(DeadLetterPolicy.builder()
+ .maxRedeliverCount(1)
+ .deadLetterTopic(dlqTopic)
+ .build())
.subscribe();
@Cleanup
QueueConsumer<byte[]> dlqConsumer =
v5Client.newQueueConsumer(Schema.bytes())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
index 7e816b0663e..1c84bf4cd92 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
@@ -78,7 +78,7 @@ public class V5ProducerBatchingTest extends V5ClientBaseTest {
@Test
public void testDefaultBatchingDeliversAllMessages() throws Exception {
- produceAndVerify(BatchingPolicy.ofDefault(), 100, "default");
+ produceAndVerify(BatchingPolicy.builder().build(), 100, "default");
}
@Test
@@ -90,8 +90,11 @@ public class V5ProducerBatchingTest extends V5ClientBaseTest
{
public void testTightBatchingByDelay() throws Exception {
// Small max-publish-delay forces batches to flush quickly; both ends
still see
// every message in order.
- BatchingPolicy tight = BatchingPolicy.of(
- Duration.ofMillis(5), 100, MemorySize.ofMegabytes(1));
+ BatchingPolicy tight = BatchingPolicy.builder()
+ .maxPublishDelay(Duration.ofMillis(5))
+ .maxMessages(100)
+ .maxSize(MemorySize.ofMegabytes(1))
+ .build();
produceAndVerify(tight, 50, "tight-delay");
}
@@ -99,8 +102,11 @@ public class V5ProducerBatchingTest extends
V5ClientBaseTest {
public void testBatchingWithSmallBatchSize() throws Exception {
// Cap the batch at 5 messages — exercises the maxMessages branch of
the batching
// packer so a 50-message stream gets cut into 10 batches.
- BatchingPolicy small = BatchingPolicy.of(
- Duration.ofSeconds(1), 5, MemorySize.ofMegabytes(1));
+ BatchingPolicy small = BatchingPolicy.builder()
+ .maxPublishDelay(Duration.ofSeconds(1))
+ .maxMessages(5)
+ .maxSize(MemorySize.ofMegabytes(1))
+ .build();
produceAndVerify(small, 50, "small-batch");
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
index 23ede822b65..6cdc197709f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -44,7 +44,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
private PulsarClient newTxnClient() throws Exception {
return track(PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
- .transactionPolicy(new
TransactionPolicy(Duration.ofMinutes(1)))
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
.build());
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
index eb7ab9a4ecd..bca04a2a596 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
@@ -20,27 +20,55 @@ package org.apache.pulsar.client.api.v5.config;
import java.time.Duration;
import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
* Backoff configuration for broker reconnection attempts.
*
* <p>The delay for attempt {@code n} is {@code min(initialInterval *
multiplier^(n-1), maxInterval)}.
*
- * @param initialInterval the delay before the first reconnection attempt
- * @param maxInterval the maximum delay between reconnection attempts
- * @param multiplier the multiplier applied after each attempt
+ * <p>Use {@link #fixed(Duration, Duration)} or {@link #exponential(Duration,
Duration)} for
+ * the common cases, or {@link #builder()} to configure all knobs explicitly.
*/
-public record BackoffPolicy(
- Duration initialInterval,
- Duration maxInterval,
- double multiplier
-) {
- public BackoffPolicy {
+@EqualsAndHashCode
+@ToString
+public final class BackoffPolicy {
+
+ private final Duration initialInterval;
+ private final Duration maxInterval;
+ private final double multiplier;
+
+ private BackoffPolicy(Duration initialInterval, Duration maxInterval,
double multiplier) {
Objects.requireNonNull(initialInterval, "initialInterval must not be
null");
Objects.requireNonNull(maxInterval, "maxInterval must not be null");
if (multiplier < 1.0) {
throw new IllegalArgumentException("multiplier must be >= 1.0");
}
+ this.initialInterval = initialInterval;
+ this.maxInterval = maxInterval;
+ this.multiplier = multiplier;
+ }
+
+ /**
+ * @return the delay before the first reconnection attempt
+ */
+ public Duration initialInterval() {
+ return initialInterval;
+ }
+
+ /**
+ * @return the maximum delay between reconnection attempts
+ */
+ public Duration maxInterval() {
+ return maxInterval;
+ }
+
+ /**
+ * @return the multiplier applied after each attempt
+ */
+ public double multiplier() {
+ return multiplier;
}
/**
@@ -66,14 +94,62 @@ public record BackoffPolicy(
}
/**
- * Create an exponential backoff with a custom multiplier.
- *
- * @param initialInterval the delay before the first reconnection attempt
- * @param maxInterval the maximum delay between reconnection attempts
- * @param multiplier the multiplier applied after each attempt, must
be >= 1.0
- * @return a {@link BackoffPolicy} with the specified parameters
+ * @return a new builder for constructing a {@link BackoffPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link BackoffPolicy}.
*/
- public static BackoffPolicy exponential(Duration initialInterval, Duration
maxInterval, double multiplier) {
- return new BackoffPolicy(initialInterval, maxInterval, multiplier);
+ public static final class Builder {
+ private Duration initialInterval;
+ private Duration maxInterval;
+ private double multiplier = 2.0;
+
+ private Builder() {
+ }
+
+ /**
+ * Delay before the first reconnection attempt. Required.
+ *
+ * @param initialInterval the initial backoff delay
+ * @return this builder
+ */
+ public Builder initialInterval(Duration initialInterval) {
+ this.initialInterval = initialInterval;
+ return this;
+ }
+
+ /**
+ * Upper bound on the backoff delay. Required.
+ *
+ * @param maxInterval the maximum backoff delay
+ * @return this builder
+ */
+ public Builder maxInterval(Duration maxInterval) {
+ this.maxInterval = maxInterval;
+ return this;
+ }
+
+ /**
+ * Multiplier applied to the previous delay on each retry. Must be
{@code >= 1.0}.
+ * Default is {@code 2.0} (exponential backoff). Use {@code 1.0} for
fixed backoff.
+ *
+ * @param multiplier the per-attempt multiplier
+ * @return this builder
+ */
+ public Builder multiplier(double multiplier) {
+ this.multiplier = multiplier;
+ return this;
+ }
+
+ /**
+ * @return a new {@link BackoffPolicy} instance
+ */
+ public BackoffPolicy build() {
+ return new BackoffPolicy(initialInterval, maxInterval, multiplier);
+ }
}
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
index 13ef7cbc4bd..187ac4e8636 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
@@ -19,6 +19,9 @@
package org.apache.pulsar.client.api.v5.config;
import java.time.Duration;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
* Configuration for producer message batching.
@@ -27,33 +30,68 @@ import java.time.Duration;
* broker request to improve throughput. A batch is flushed when any of the
* configured thresholds is reached.
*
- * @param enabled whether batching is enabled
- * @param maxPublishDelay maximum time to wait before flushing a batch
- * @param maxMessages maximum number of messages in a single batch
- * @param maxSize maximum size of a single batch
+ * <p>Construct via {@link #builder()}, or use {@link #ofDisabled()} to opt
out.
*/
-public record BatchingPolicy(
- boolean enabled,
- Duration maxPublishDelay,
- int maxMessages,
- MemorySize maxSize
-) {
- public BatchingPolicy {
+@EqualsAndHashCode
+@ToString
+public final class BatchingPolicy {
+
+ private static final Duration DEFAULT_MAX_PUBLISH_DELAY =
Duration.ofMillis(1);
+ private static final int DEFAULT_MAX_MESSAGES = 1000;
+ private static final MemorySize DEFAULT_MAX_SIZE =
MemorySize.ofKilobytes(128);
+
+ private static final BatchingPolicy DISABLED =
+ new BatchingPolicy(false, DEFAULT_MAX_PUBLISH_DELAY,
DEFAULT_MAX_MESSAGES, DEFAULT_MAX_SIZE);
+
+ private final boolean enabled;
+ private final Duration maxPublishDelay;
+ private final int maxMessages;
+ private final MemorySize maxSize;
+
+ private BatchingPolicy(boolean enabled, Duration maxPublishDelay, int
maxMessages, MemorySize maxSize) {
if (maxPublishDelay == null) {
- maxPublishDelay = Duration.ofMillis(1);
+ maxPublishDelay = DEFAULT_MAX_PUBLISH_DELAY;
}
+ Objects.requireNonNull(maxSize, "maxSize must not be null");
if (maxMessages < 0) {
throw new IllegalArgumentException("maxMessages must be >= 0");
}
if (maxSize.bytes() < 0) {
throw new IllegalArgumentException("maxBytes must be >= 0");
}
+ this.enabled = enabled;
+ this.maxPublishDelay = maxPublishDelay;
+ this.maxMessages = maxMessages;
+ this.maxSize = maxSize;
}
- private static final BatchingPolicy DISABLED =
- new BatchingPolicy(false, Duration.ofMillis(1), 1000,
MemorySize.ofKilobytes(128));
- private static final BatchingPolicy DEFAULT =
- new BatchingPolicy(true, Duration.ofMillis(1), 1000,
MemorySize.ofKilobytes(128));
+ /**
+ * @return whether batching is enabled
+ */
+ public boolean enabled() {
+ return enabled;
+ }
+
+ /**
+ * @return the maximum time to wait before flushing a batch
+ */
+ public Duration maxPublishDelay() {
+ return maxPublishDelay;
+ }
+
+ /**
+ * @return the maximum number of messages in a single batch
+ */
+ public int maxMessages() {
+ return maxMessages;
+ }
+
+ /**
+ * @return the maximum size of a single batch
+ */
+ public MemorySize maxSize() {
+ return maxSize;
+ }
/**
* Batching disabled.
@@ -65,23 +103,73 @@ public record BatchingPolicy(
}
/**
- * Batching enabled with default thresholds (1ms delay, 1000 messages,
128KB).
- *
- * @return a {@link BatchingPolicy} with default batching thresholds
+ * @return a new builder for constructing a {@link BatchingPolicy}
*/
- public static BatchingPolicy ofDefault() {
- return DEFAULT;
+ public static Builder builder() {
+ return new Builder();
}
/**
- * Batching enabled with custom thresholds.
- *
- * @param maxPublishDelay the maximum time to wait before flushing a batch
- * @param maxMessages the maximum number of messages in a single batch
- * @param maxSize the maximum size of a single batch
- * @return a {@link BatchingPolicy} with batching enabled and the
specified thresholds
+ * Builder for {@link BatchingPolicy}.
*/
- public static BatchingPolicy of(Duration maxPublishDelay, int maxMessages,
MemorySize maxSize) {
- return new BatchingPolicy(true, maxPublishDelay, maxMessages, maxSize);
+ public static final class Builder {
+ private boolean enabled = true;
+ private Duration maxPublishDelay = DEFAULT_MAX_PUBLISH_DELAY;
+ private int maxMessages = DEFAULT_MAX_MESSAGES;
+ private MemorySize maxSize = DEFAULT_MAX_SIZE;
+
+ private Builder() {
+ }
+
+ /**
+ * Whether batching is enabled. Default is {@code true}.
+ *
+ * @param enabled whether to batch outgoing messages
+ * @return this builder
+ */
+ public Builder enabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ /**
+ * Maximum time the producer waits before flushing a partial batch.
+ *
+ * @param maxPublishDelay the upper bound on per-message latency added
by batching
+ * @return this builder
+ */
+ public Builder maxPublishDelay(Duration maxPublishDelay) {
+ this.maxPublishDelay = maxPublishDelay;
+ return this;
+ }
+
+ /**
+ * Maximum number of messages per batch.
+ *
+ * @param maxMessages the message-count flush threshold
+ * @return this builder
+ */
+ public Builder maxMessages(int maxMessages) {
+ this.maxMessages = maxMessages;
+ return this;
+ }
+
+ /**
+ * Maximum payload size per batch.
+ *
+ * @param maxSize the size flush threshold
+ * @return this builder
+ */
+ public Builder maxSize(MemorySize maxSize) {
+ this.maxSize = maxSize;
+ return this;
+ }
+
+ /**
+ * @return a new {@link BatchingPolicy} instance
+ */
+ public BatchingPolicy build() {
+ return new BatchingPolicy(enabled, maxPublishDelay, maxMessages,
maxSize);
+ }
}
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
index ff32a287a36..e523a73328d 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
@@ -18,14 +18,102 @@
*/
package org.apache.pulsar.client.api.v5.config;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
/**
* Configuration for chunking large messages that exceed the broker's max
message size.
*
- * @param enabled whether chunking is enabled
- * @param chunkSize maximum size of each chunk in bytes
+ * <p>When chunking is enabled, the producer splits a payload larger than the
+ * configured chunk size into smaller pieces that are reassembled by the
consumer.
+ *
+ * <p>Use {@link #ofDisabled()} to opt out, or {@link #builder()} to enable
with a
+ * specific chunk size.
*/
-public record ChunkingPolicy(
- boolean enabled,
- int chunkSize
-) {
+@EqualsAndHashCode
+@ToString
+public final class ChunkingPolicy {
+
+ private static final ChunkingPolicy DISABLED = new ChunkingPolicy(false,
0);
+
+ private final boolean enabled;
+ private final int chunkSize;
+
+ private ChunkingPolicy(boolean enabled, int chunkSize) {
+ if (enabled && chunkSize <= 0) {
+ throw new IllegalArgumentException("chunkSize must be > 0 when
chunking is enabled");
+ }
+ this.enabled = enabled;
+ this.chunkSize = chunkSize;
+ }
+
+ /**
+ * @return whether chunking is enabled
+ */
+ public boolean enabled() {
+ return enabled;
+ }
+
+ /**
+ * @return the maximum size of each chunk in bytes (only meaningful when
{@link #enabled()})
+ */
+ public int chunkSize() {
+ return chunkSize;
+ }
+
+ /**
+ * Chunking disabled.
+ *
+ * @return a {@link ChunkingPolicy} with chunking disabled
+ */
+ public static ChunkingPolicy ofDisabled() {
+ return DISABLED;
+ }
+
+ /**
+ * @return a new builder for constructing a {@link ChunkingPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link ChunkingPolicy}.
+ */
+ public static final class Builder {
+ private boolean enabled = true;
+ private int chunkSize;
+
+ private Builder() {
+ }
+
+ /**
+ * Whether chunking is enabled. Default is {@code true}.
+ *
+ * @param enabled whether to chunk oversized payloads
+ * @return this builder
+ */
+ public Builder enabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ /**
+ * Maximum size of each chunk in bytes. Required when chunking is
enabled.
+ *
+ * @param chunkSize the per-chunk size in bytes
+ * @return this builder
+ */
+ public Builder chunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ return this;
+ }
+
+ /**
+ * @return a new {@link ChunkingPolicy} instance
+ */
+ public ChunkingPolicy build() {
+ return new ChunkingPolicy(enabled, chunkSize);
+ }
+ }
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
index 79c318211b3..4e83e9c7a81 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
@@ -18,12 +18,34 @@
*/
package org.apache.pulsar.client.api.v5.config;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
/**
* Compression configuration for producer message payloads.
*
- * @param type the compression codec to use
+ * <p>The dominant configuration is the codec, so {@link #of(CompressionType)}
is the
+ * primary entry point. Use {@link #builder()} if you need to set additional
knobs in
+ * the future.
*/
-public record CompressionPolicy(CompressionType type) {
+@EqualsAndHashCode
+@ToString
+public final class CompressionPolicy {
+
+ private final CompressionType type;
+
+ private CompressionPolicy(CompressionType type) {
+ Objects.requireNonNull(type, "type must not be null");
+ this.type = type;
+ }
+
+ /**
+ * @return the compression codec
+ */
+ public CompressionType type() {
+ return type;
+ }
/**
* No compression.
@@ -43,4 +65,39 @@ public record CompressionPolicy(CompressionType type) {
public static CompressionPolicy of(CompressionType type) {
return new CompressionPolicy(type);
}
+
+ /**
+ * @return a new builder for constructing a {@link CompressionPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link CompressionPolicy}.
+ */
+ public static final class Builder {
+ private CompressionType type = CompressionType.NONE;
+
+ private Builder() {
+ }
+
+ /**
+ * Compression codec to use for message payloads.
+ *
+ * @param type the compression codec
+ * @return this builder
+ */
+ public Builder type(CompressionType type) {
+ this.type = type;
+ return this;
+ }
+
+ /**
+ * @return a new {@link CompressionPolicy} instance
+ */
+ public CompressionPolicy build() {
+ return new CompressionPolicy(type);
+ }
+ }
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
index b502dde7315..83601c5f780 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
@@ -20,35 +20,42 @@ package org.apache.pulsar.client.api.v5.config;
import java.time.Duration;
import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
* Connection-level settings for the Pulsar client.
*
* <p>Groups TCP connection timeout, connection pool sizing, keep-alive, idle
timeout,
* TCP no-delay, I/O and callback threading, and proxy configuration.
+ *
+ * <p>Construct via {@link #builder()}.
*/
-public record ConnectionPolicy(
- Duration connectionTimeout,
- int connectionsPerBroker,
- boolean enableTcpNoDelay,
- Duration keepAliveInterval,
- Duration connectionMaxIdleTime,
- int ioThreads,
- int callbackThreads,
- String proxyServiceUrl,
- ProxyProtocol proxyProtocol,
- BackoffPolicy connectionBackoff
-) {
+@EqualsAndHashCode
+@ToString
+public final class ConnectionPolicy {
- /**
- * Create a connection policy with the given parameters.
- *
- * @throws NullPointerException if {@code connectionTimeout}, {@code
keepAliveInterval},
- * {@code connectionMaxIdleTime}, or {@code connectionBackoff} is
null
- * @throws IllegalArgumentException if {@code connectionsPerBroker},
{@code ioThreads},
- * or {@code callbackThreads} is less than 1
- */
- public ConnectionPolicy {
+ private final Duration connectionTimeout;
+ private final int connectionsPerBroker;
+ private final boolean enableTcpNoDelay;
+ private final Duration keepAliveInterval;
+ private final Duration connectionMaxIdleTime;
+ private final int ioThreads;
+ private final int callbackThreads;
+ private final String proxyServiceUrl;
+ private final ProxyProtocol proxyProtocol;
+ private final BackoffPolicy connectionBackoff;
+
+ private ConnectionPolicy(Duration connectionTimeout,
+ int connectionsPerBroker,
+ boolean enableTcpNoDelay,
+ Duration keepAliveInterval,
+ Duration connectionMaxIdleTime,
+ int ioThreads,
+ int callbackThreads,
+ String proxyServiceUrl,
+ ProxyProtocol proxyProtocol,
+ BackoffPolicy connectionBackoff) {
Objects.requireNonNull(connectionTimeout, "connectionTimeout must not
be null");
Objects.requireNonNull(keepAliveInterval, "keepAliveInterval must not
be null");
Objects.requireNonNull(connectionMaxIdleTime, "connectionMaxIdleTime
must not be null");
@@ -62,12 +69,90 @@ public record ConnectionPolicy(
if (callbackThreads < 1) {
throw new IllegalArgumentException("callbackThreads must be >= 1");
}
+ this.connectionTimeout = connectionTimeout;
+ this.connectionsPerBroker = connectionsPerBroker;
+ this.enableTcpNoDelay = enableTcpNoDelay;
+ this.keepAliveInterval = keepAliveInterval;
+ this.connectionMaxIdleTime = connectionMaxIdleTime;
+ this.ioThreads = ioThreads;
+ this.callbackThreads = callbackThreads;
+ this.proxyServiceUrl = proxyServiceUrl;
+ this.proxyProtocol = proxyProtocol;
+ this.connectionBackoff = connectionBackoff;
+ }
+
+ /**
+ * @return the maximum duration to wait for a TCP connection to a broker
+ */
+ public Duration connectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * @return the number of TCP connections maintained per broker
+ */
+ public int connectionsPerBroker() {
+ return connectionsPerBroker;
+ }
+
+ /**
+ * @return whether TCP no-delay (Nagle disabled) is enabled
+ */
+ public boolean enableTcpNoDelay() {
+ return enableTcpNoDelay;
+ }
+
+ /**
+ * @return the interval between TCP keep-alive probes
+ */
+ public Duration keepAliveInterval() {
+ return keepAliveInterval;
+ }
+
+ /**
+ * @return the maximum idle duration before a connection is closed
+ */
+ public Duration connectionMaxIdleTime() {
+ return connectionMaxIdleTime;
+ }
+
+ /**
+ * @return the number of I/O threads
+ */
+ public int ioThreads() {
+ return ioThreads;
+ }
+
+ /**
+ * @return the number of callback threads
+ */
+ public int callbackThreads() {
+ return callbackThreads;
+ }
+
+ /**
+ * @return the proxy service URL, or {@code null} if no proxy is configured
+ */
+ public String proxyServiceUrl() {
+ return proxyServiceUrl;
+ }
+
+ /**
+ * @return the proxy protocol, or {@code null} if no proxy is configured
+ */
+ public ProxyProtocol proxyProtocol() {
+ return proxyProtocol;
+ }
+
+ /**
+ * @return the broker-reconnection backoff policy
+ */
+ public BackoffPolicy connectionBackoff() {
+ return connectionBackoff;
}
/**
- * Create a builder for constructing a {@link ConnectionPolicy}.
- *
- * @return a new builder with sensible defaults
+ * @return a new builder for constructing a {@link ConnectionPolicy}
*/
public static Builder builder() {
return new Builder();
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
index cb4f34ad4a4..4b610b58770 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
@@ -18,36 +18,137 @@
*/
package org.apache.pulsar.client.api.v5.config;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
/**
* Configuration for the dead letter queue mechanism.
*
* <p>When a message has been redelivered more than {@code maxRedeliverCount}
times,
* it is moved to the dead letter topic instead of being redelivered again.
*
- * @param maxRedeliverCount maximum number of redelivery attempts before
sending to the dead letter topic
- * @param retryLetterTopic custom topic for retry messages (nullable,
auto-generated if null)
- * @param deadLetterTopic custom topic for dead letter messages
(nullable, auto-generated if null)
- * @param initialSubscriptionName subscription name to create on the dead
letter topic (nullable)
+ * <p>Construct via {@link #builder()}.
*/
-public record DeadLetterPolicy(
- int maxRedeliverCount,
- String retryLetterTopic,
- String deadLetterTopic,
- String initialSubscriptionName
-) {
- public DeadLetterPolicy {
+@EqualsAndHashCode
+@ToString
+public final class DeadLetterPolicy {
+
+ private final int maxRedeliverCount;
+ private final String retryLetterTopic;
+ private final String deadLetterTopic;
+ private final String initialSubscriptionName;
+
+ private DeadLetterPolicy(int maxRedeliverCount, String retryLetterTopic,
+ String deadLetterTopic, String
initialSubscriptionName) {
if (maxRedeliverCount < 0) {
throw new IllegalArgumentException("maxRedeliverCount must be >=
0");
}
+ this.maxRedeliverCount = maxRedeliverCount;
+ this.retryLetterTopic = retryLetterTopic;
+ this.deadLetterTopic = deadLetterTopic;
+ this.initialSubscriptionName = initialSubscriptionName;
+ }
+
+ /**
+ * @return the maximum number of redelivery attempts before sending to the
dead letter topic
+ */
+ public int maxRedeliverCount() {
+ return maxRedeliverCount;
+ }
+
+ /**
+ * @return the custom retry letter topic, or {@code null} for the
auto-generated default
+ */
+ public String retryLetterTopic() {
+ return retryLetterTopic;
}
/**
- * Create a dead letter policy with just a max redeliver count, using
default topic names.
- *
- * @param maxRedeliverCount the maximum number of redelivery attempts
before sending to the dead letter topic
- * @return a {@link DeadLetterPolicy} with auto-generated topic names and
no initial subscription
+ * @return the custom dead letter topic, or {@code null} for the
auto-generated default
*/
- public static DeadLetterPolicy of(int maxRedeliverCount) {
- return new DeadLetterPolicy(maxRedeliverCount, null, null, null);
+ public String deadLetterTopic() {
+ return deadLetterTopic;
+ }
+
+ /**
+ * @return the subscription name to create on the dead letter topic, or
{@code null} if none
+ */
+ public String initialSubscriptionName() {
+ return initialSubscriptionName;
+ }
+
+ /**
+ * @return a new builder for constructing a {@link DeadLetterPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link DeadLetterPolicy}.
+ */
+ public static final class Builder {
+ private int maxRedeliverCount;
+ private String retryLetterTopic;
+ private String deadLetterTopic;
+ private String initialSubscriptionName;
+
+ private Builder() {
+ }
+
+ /**
+ * Maximum number of redelivery attempts before sending to the dead
letter topic.
+ * Required.
+ *
+ * @param maxRedeliverCount the redelivery threshold
+ * @return this builder
+ */
+ public Builder maxRedeliverCount(int maxRedeliverCount) {
+ this.maxRedeliverCount = maxRedeliverCount;
+ return this;
+ }
+
+ /**
+ * Custom retry letter topic. {@code null} (default) auto-generates
the name.
+ *
+ * @param retryLetterTopic the retry letter topic name
+ * @return this builder
+ */
+ public Builder retryLetterTopic(String retryLetterTopic) {
+ this.retryLetterTopic = retryLetterTopic;
+ return this;
+ }
+
+ /**
+ * Custom dead letter topic. {@code null} (default) auto-generates the
name as
+ * {@code <source>-DLQ}.
+ *
+ * @param deadLetterTopic the dead letter topic name
+ * @return this builder
+ */
+ public Builder deadLetterTopic(String deadLetterTopic) {
+ this.deadLetterTopic = deadLetterTopic;
+ return this;
+ }
+
+ /**
+ * Subscription name to create on the dead letter topic, ensuring
messages
+ * forwarded to the DLQ are retained until consumed.
+ *
+ * @param initialSubscriptionName the DLQ initial subscription
+ * @return this builder
+ */
+ public Builder initialSubscriptionName(String initialSubscriptionName)
{
+ this.initialSubscriptionName = initialSubscriptionName;
+ return this;
+ }
+
+ /**
+ * @return a new {@link DeadLetterPolicy} instance
+ */
+ public DeadLetterPolicy build() {
+ return new DeadLetterPolicy(maxRedeliverCount, retryLetterTopic,
+ deadLetterTopic, initialSubscriptionName);
+ }
}
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
index d443274c6e7..c196a8f20fb 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
@@ -20,25 +20,31 @@ package org.apache.pulsar.client.api.v5.config;
import java.util.List;
import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import org.apache.pulsar.client.api.v5.auth.CryptoFailureAction;
import org.apache.pulsar.client.api.v5.auth.CryptoKeyReader;
/**
* End-to-end encryption configuration for producers and consumers.
*
- * <p>For producers, supply a {@link CryptoKeyReader} and one or more
encryption key names.
- * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link
CryptoFailureAction}.
+ * <p>For producers, supply a {@link CryptoKeyReader} and one or more
encryption key names —
+ * use {@link #forProducer(CryptoKeyReader, String...)} as the typical entry
point.
+ * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link
CryptoFailureAction} —
+ * use {@link #forConsumer(CryptoKeyReader, CryptoFailureAction)}.
*
- * @param keyReader the crypto key reader for loading
encryption/decryption keys
- * @param keyNames encryption key names (producer-side; empty list for
consumer/reader)
- * @param failureAction action to take when encryption or decryption fails
+ * <p>{@link #builder()} is available for callers that need to tune both sides
explicitly.
*/
-public record EncryptionPolicy(
- CryptoKeyReader keyReader,
- List<String> keyNames,
- CryptoFailureAction failureAction
-) {
- public EncryptionPolicy {
+@EqualsAndHashCode
+@ToString
+public final class EncryptionPolicy {
+
+ private final CryptoKeyReader keyReader;
+ private final List<String> keyNames;
+ private final CryptoFailureAction failureAction;
+
+ private EncryptionPolicy(CryptoKeyReader keyReader, List<String> keyNames,
+ CryptoFailureAction failureAction) {
Objects.requireNonNull(keyReader, "keyReader must not be null");
if (keyNames == null) {
keyNames = List.of();
@@ -46,6 +52,30 @@ public record EncryptionPolicy(
if (failureAction == null) {
failureAction = CryptoFailureAction.FAIL;
}
+ this.keyReader = keyReader;
+ this.keyNames = List.copyOf(keyNames);
+ this.failureAction = failureAction;
+ }
+
+ /**
+ * @return the crypto key reader for loading encryption/decryption keys
+ */
+ public CryptoKeyReader keyReader() {
+ return keyReader;
+ }
+
+ /**
+ * @return the producer-side encryption key names (empty list for
consumer/reader)
+ */
+ public List<String> keyNames() {
+ return keyNames;
+ }
+
+ /**
+ * @return the action to take when encryption or decryption fails
+ */
+ public CryptoFailureAction failureAction() {
+ return failureAction;
}
/**
@@ -69,4 +99,64 @@ public record EncryptionPolicy(
public static EncryptionPolicy forConsumer(CryptoKeyReader keyReader,
CryptoFailureAction failureAction) {
return new EncryptionPolicy(keyReader, List.of(), failureAction);
}
+
+ /**
+ * @return a new builder for constructing an {@link EncryptionPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link EncryptionPolicy}.
+ */
+ public static final class Builder {
+ private CryptoKeyReader keyReader;
+ private List<String> keyNames = List.of();
+ private CryptoFailureAction failureAction = CryptoFailureAction.FAIL;
+
+ private Builder() {
+ }
+
+ /**
+ * Crypto key reader used to load encryption/decryption keys. Required.
+ *
+ * @param keyReader the key reader
+ * @return this builder
+ */
+ public Builder keyReader(CryptoKeyReader keyReader) {
+ this.keyReader = keyReader;
+ return this;
+ }
+
+ /**
+ * Producer-side encryption key names. Leave empty (default) for
consumer-side use.
+ *
+ * @param keyNames the key names
+ * @return this builder
+ */
+ public Builder keyNames(String... keyNames) {
+ this.keyNames = List.of(keyNames);
+ return this;
+ }
+
+ /**
+ * Action to take when encryption (producer) or decryption (consumer)
fails.
+ * Default is {@link CryptoFailureAction#FAIL}.
+ *
+ * @param failureAction the failure action
+ * @return this builder
+ */
+ public Builder failureAction(CryptoFailureAction failureAction) {
+ this.failureAction = failureAction;
+ return this;
+ }
+
+ /**
+ * @return a new {@link EncryptionPolicy} instance
+ */
+ public EncryptionPolicy build() {
+ return new EncryptionPolicy(keyReader, keyNames, failureAction);
+ }
+ }
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
index 0cda6c3ee04..0c71f14590e 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.api.v5.config;
import java.time.Duration;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
* Optional safety net for slow / stalled queue consumers: if the application
doesn't
@@ -36,37 +38,101 @@ import java.time.Duration;
* when the application's processing time is bounded and you want stalled
deliveries to
* be reattempted automatically.
*
- * @param timeout how long the client waits for the application to
ack a
- * delivery before requesting redelivery. {@link
Duration#ZERO}
- * disables.
- * @param redeliveryBackoff optional backoff applied between redeliveries.
May be
- * {@code null} for the default (no extra delay).
+ * <p>Use {@link #of(Duration)} for the common case (no extra backoff), or
+ * {@link #builder()} to also configure {@code redeliveryBackoff}.
*/
-public record ProcessingTimeoutPolicy(
- Duration timeout,
- BackoffPolicy redeliveryBackoff
-) {
- public ProcessingTimeoutPolicy {
+@EqualsAndHashCode
+@ToString
+public final class ProcessingTimeoutPolicy {
+
+ private final Duration timeout;
+ private final BackoffPolicy redeliveryBackoff;
+
+ private ProcessingTimeoutPolicy(Duration timeout, BackoffPolicy
redeliveryBackoff) {
if (timeout == null) {
throw new IllegalArgumentException("timeout must not be null");
}
if (timeout.isNegative()) {
throw new IllegalArgumentException("timeout must not be negative");
}
+ this.timeout = timeout;
+ this.redeliveryBackoff = redeliveryBackoff;
+ }
+
+ /**
+ * @return how long the client waits for the application to ack a delivery
before
+ * requesting redelivery; {@link Duration#ZERO} disables
+ */
+ public Duration timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return optional backoff applied between redeliveries, or {@code null}
for the
+ * default (no extra delay)
+ */
+ public BackoffPolicy redeliveryBackoff() {
+ return redeliveryBackoff;
}
/**
* Create a policy with just a timeout — the broker uses its default
redelivery
* cadence (no extra backoff between retries).
+ *
+ * @param timeout the processing-timeout duration
+ * @return a {@link ProcessingTimeoutPolicy} with no extra redelivery
backoff
*/
public static ProcessingTimeoutPolicy of(Duration timeout) {
return new ProcessingTimeoutPolicy(timeout, null);
}
/**
- * Create a policy with a timeout and an explicit redelivery backoff.
+ * @return a new builder for constructing a {@link ProcessingTimeoutPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link ProcessingTimeoutPolicy}.
*/
- public static ProcessingTimeoutPolicy of(Duration timeout, BackoffPolicy
redeliveryBackoff) {
- return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+ public static final class Builder {
+ private Duration timeout;
+ private BackoffPolicy redeliveryBackoff;
+
+ private Builder() {
+ }
+
+ /**
+ * Processing-timeout duration — how long the client waits for the
application
+ * to ack before asking the broker to redeliver. {@link Duration#ZERO}
disables.
+ * Required.
+ *
+ * @param timeout the processing timeout
+ * @return this builder
+ */
+ public Builder timeout(Duration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Optional backoff applied between redeliveries. {@code null}
(default) means
+ * "redeliver immediately on the next sweep".
+ *
+ * @param redeliveryBackoff the backoff policy
+ * @return this builder
+ */
+ public Builder redeliveryBackoff(BackoffPolicy redeliveryBackoff) {
+ this.redeliveryBackoff = redeliveryBackoff;
+ return this;
+ }
+
+ /**
+ * @return a new {@link ProcessingTimeoutPolicy} instance
+ */
+ public ProcessingTimeoutPolicy build() {
+ return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+ }
}
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
index d47b8f71f73..ebd2e8ca196 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
@@ -18,42 +18,66 @@
*/
package org.apache.pulsar.client.api.v5.config;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
/**
* TLS configuration for the Pulsar client connection.
*
- * @param trustCertsFilePath path to the trusted CA certificate file
(PEM format)
- * @param keyFilePath path to the client private key file (PEM
format), or {@code null}
- * @param certificateFilePath path to the client certificate file (PEM
format), or {@code null}
- * @param allowInsecureConnection whether to allow connecting to brokers
with untrusted certificates
- * @param enableHostnameVerification whether to verify the broker hostname
against the certificate
+ * <p>Construct via {@link #builder()}, or use {@link #ofInsecure()} for
development.
*/
-public record TlsPolicy(
- String trustCertsFilePath,
- String keyFilePath,
- String certificateFilePath,
- boolean allowInsecureConnection,
- boolean enableHostnameVerification
-) {
+@EqualsAndHashCode
+@ToString
+public final class TlsPolicy {
+
+ private final String trustCertsFilePath;
+ private final String keyFilePath;
+ private final String certificateFilePath;
+ private final boolean allowInsecureConnection;
+ private final boolean enableHostnameVerification;
+
+ private TlsPolicy(String trustCertsFilePath, String keyFilePath, String
certificateFilePath,
+ boolean allowInsecureConnection, boolean
enableHostnameVerification) {
+ this.trustCertsFilePath = trustCertsFilePath;
+ this.keyFilePath = keyFilePath;
+ this.certificateFilePath = certificateFilePath;
+ this.allowInsecureConnection = allowInsecureConnection;
+ this.enableHostnameVerification = enableHostnameVerification;
+ }
+
/**
- * Create a TLS policy that trusts the given CA certificate and verifies
hostnames.
- *
- * @param trustCertsFilePath the path to the trusted CA certificate file
(PEM format)
- * @return a {@link TlsPolicy} with hostname verification enabled and
insecure connections disabled
+ * @return path to the trusted CA certificate file (PEM), or {@code null}
when not set
*/
- public static TlsPolicy of(String trustCertsFilePath) {
- return new TlsPolicy(trustCertsFilePath, null, null, false, true);
+ public String trustCertsFilePath() {
+ return trustCertsFilePath;
}
/**
- * Create a TLS policy with mutual TLS (mTLS) authentication.
- *
- * @param trustCertsFilePath the path to the trusted CA certificate file
(PEM format)
- * @param keyFilePath the path to the client private key file (PEM
format)
- * @param certificateFilePath the path to the client certificate file (PEM
format)
- * @return a {@link TlsPolicy} configured for mutual TLS with hostname
verification enabled
+ * @return path to the client private key file (PEM), or {@code null} when
not using mTLS
+ */
+ public String keyFilePath() {
+ return keyFilePath;
+ }
+
+ /**
+ * @return path to the client certificate file (PEM), or {@code null} when
not using mTLS
+ */
+ public String certificateFilePath() {
+ return certificateFilePath;
+ }
+
+ /**
+ * @return whether connections to brokers with untrusted certificates are
allowed
+ */
+ public boolean allowInsecureConnection() {
+ return allowInsecureConnection;
+ }
+
+ /**
+ * @return whether the broker hostname is verified against the certificate
*/
- public static TlsPolicy ofMutualTls(String trustCertsFilePath, String
keyFilePath, String certificateFilePath) {
- return new TlsPolicy(trustCertsFilePath, keyFilePath,
certificateFilePath, false, true);
+ public boolean enableHostnameVerification() {
+ return enableHostnameVerification;
}
/**
@@ -64,4 +88,90 @@ public record TlsPolicy(
public static TlsPolicy ofInsecure() {
return new TlsPolicy(null, null, null, true, false);
}
+
+ /**
+ * @return a new builder for constructing a {@link TlsPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link TlsPolicy}.
+ */
+ public static final class Builder {
+ private String trustCertsFilePath;
+ private String keyFilePath;
+ private String certificateFilePath;
+ private boolean allowInsecureConnection = false;
+ private boolean enableHostnameVerification = true;
+
+ private Builder() {
+ }
+
+ /**
+ * Path to the trusted CA certificate file (PEM format).
+ *
+ * @param trustCertsFilePath the trust store path
+ * @return this builder
+ */
+ public Builder trustCertsFilePath(String trustCertsFilePath) {
+ this.trustCertsFilePath = trustCertsFilePath;
+ return this;
+ }
+
+ /**
+ * Path to the client private key file (PEM format) for mutual TLS.
+ *
+ * @param keyFilePath the client key path
+ * @return this builder
+ */
+ public Builder keyFilePath(String keyFilePath) {
+ this.keyFilePath = keyFilePath;
+ return this;
+ }
+
+ /**
+ * Path to the client certificate file (PEM format) for mutual TLS.
+ *
+ * @param certificateFilePath the client certificate path
+ * @return this builder
+ */
+ public Builder certificateFilePath(String certificateFilePath) {
+ this.certificateFilePath = certificateFilePath;
+ return this;
+ }
+
+ /**
+ * Whether to allow connecting to brokers with untrusted certificates.
+ * Default {@code false}. Enable only for development.
+ *
+ * @param allowInsecureConnection whether to skip certificate
validation
+ * @return this builder
+ */
+ public Builder allowInsecureConnection(boolean
allowInsecureConnection) {
+ this.allowInsecureConnection = allowInsecureConnection;
+ return this;
+ }
+
+ /**
+ * Whether to verify the broker hostname against the certificate.
Default
+ * {@code true}.
+ *
+ * @param enableHostnameVerification whether to verify the hostname
+ * @return this builder
+ */
+ public Builder enableHostnameVerification(boolean
enableHostnameVerification) {
+ this.enableHostnameVerification = enableHostnameVerification;
+ return this;
+ }
+
+ /**
+ * @return a new {@link TlsPolicy} instance
+ */
+ public TlsPolicy build() {
+ return new TlsPolicy(trustCertsFilePath, keyFilePath,
certificateFilePath,
+ allowInsecureConnection, enableHostnameVerification);
+ }
+ }
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
index 228bdeed020..8787d68db17 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
@@ -19,12 +19,70 @@
package org.apache.pulsar.client.api.v5.config;
import java.time.Duration;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
/**
* Transaction configuration for the Pulsar client.
*
- * @param timeout transaction timeout. If the transaction is not committed or
aborted within this duration, it will
- * be automatically aborted by the broker.
+ * <p>Construct via {@link #builder()}.
*/
-public record TransactionPolicy(Duration timeout) {
+@EqualsAndHashCode
+@ToString
+public final class TransactionPolicy {
+
+ private final Duration timeout;
+
+ private TransactionPolicy(Duration timeout) {
+ Objects.requireNonNull(timeout, "timeout must not be null");
+ if (timeout.isNegative() || timeout.isZero()) {
+ throw new IllegalArgumentException("timeout must be > 0");
+ }
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return transaction timeout — if the transaction is not committed or
aborted within this duration,
+ * the broker automatically aborts it
+ */
+ public Duration timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return a new builder for constructing a {@link TransactionPolicy}
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link TransactionPolicy}.
+ */
+ public static final class Builder {
+ private Duration timeout;
+
+ private Builder() {
+ }
+
+ /**
+ * Transaction timeout. Required. The broker auto-aborts transactions
that
+ * neither commit nor abort within this duration.
+ *
+ * @param timeout the transaction timeout
+ * @return this builder
+ */
+ public Builder timeout(Duration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * @return a new {@link TransactionPolicy} instance
+ */
+ public TransactionPolicy build() {
+ return new TransactionPolicy(timeout);
+ }
+ }
}
diff --git
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 582a41a0574..f05cfbd4a21 100644
---
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -65,7 +65,7 @@ public class Examples {
try (var client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://pulsar.example.com:6651")
.authentication(AuthenticationFactory.token("eyJhbGci..."))
- .tlsPolicy(TlsPolicy.of("/etc/pulsar/ca.pem"))
+
.tlsPolicy(TlsPolicy.builder().trustCertsFilePath("/etc/pulsar/ca.pem").build())
.operationTimeout(Duration.ofSeconds(30))
.connectionPolicy(ConnectionPolicy.builder()
.connectionTimeout(Duration.ofSeconds(10))
@@ -107,8 +107,11 @@ public class Examples {
try (var producer =
client.newProducer(Schema.json(SensorReading.class))
.topic("sensor-data")
.compressionPolicy(CompressionPolicy.of(CompressionType.ZSTD))
- .batchingPolicy(BatchingPolicy.of(
- Duration.ofMillis(10), 5000,
MemorySize.ofMegabytes(1)))
+ .batchingPolicy(BatchingPolicy.builder()
+ .maxPublishDelay(Duration.ofMillis(10))
+ .maxMessages(5000)
+ .maxSize(MemorySize.ofMegabytes(1))
+ .build())
.create()) {
for (int i = 0; i < 100_000; i++) {
@@ -281,7 +284,7 @@ public class Examples {
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(30)))
.negativeAckRedeliveryBackoff(
BackoffPolicy.exponential(Duration.ofSeconds(1),
Duration.ofMinutes(5)))
- .deadLetterPolicy(DeadLetterPolicy.of(5))
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(5).build())
.subscribe()) {
while (true) {