This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 12d5f26bec8 PIP-468: V5 policy options use builder pattern (#25657)
12d5f26bec8 is described below

commit 12d5f26bec8d5f413e459a94b835a22d80e58f15
Author: Matteo Merli <[email protected]>
AuthorDate: Sun May 3 10:24:52 2026 -0700

    PIP-468: V5 policy options use builder pattern (#25657)
---
 .../client/api/v5/V5DeadLetterPolicyTest.java      |  17 ++-
 .../client/api/v5/V5ProducerBatchingTest.java      |  16 +-
 .../pulsar/client/api/v5/V5TransactionTest.java    |   2 +-
 .../pulsar/client/api/v5/config/BackoffPolicy.java | 110 +++++++++++---
 .../client/api/v5/config/BatchingPolicy.java       | 146 +++++++++++++++----
 .../client/api/v5/config/ChunkingPolicy.java       | 100 ++++++++++++-
 .../client/api/v5/config/CompressionPolicy.java    |  61 +++++++-
 .../client/api/v5/config/ConnectionPolicy.java     | 133 ++++++++++++++---
 .../client/api/v5/config/DeadLetterPolicy.java     | 135 ++++++++++++++---
 .../client/api/v5/config/EncryptionPolicy.java     | 112 ++++++++++++--
 .../api/v5/config/ProcessingTimeoutPolicy.java     |  92 ++++++++++--
 .../pulsar/client/api/v5/config/TlsPolicy.java     | 162 +++++++++++++++++----
 .../client/api/v5/config/TransactionPolicy.java    |  64 +++++++-
 .../org/apache/pulsar/client/api/v5/Examples.java  |  11 +-
 14 files changed, 999 insertions(+), 162 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
index 11f5cd0b462..dd29e618985 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -59,7 +59,10 @@ public class V5DeadLetterPolicyTest extends V5ClientBaseTest 
{
                 .subscriptionName("dlq-sub")
                 .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
                         Duration.ofMillis(200), Duration.ofMillis(200)))
-                .deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic, 
null))
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(2)
+                        .deadLetterTopic(dlqTopic)
+                        .build())
                 .subscribe();
 
         @Cleanup
@@ -104,7 +107,7 @@ public class V5DeadLetterPolicyTest extends 
V5ClientBaseTest {
                 .subscriptionName("default-dlq-sub")
                 .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
                         Duration.ofMillis(200), Duration.ofMillis(200)))
-                .deadLetterPolicy(DeadLetterPolicy.of(1))
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
                 .subscribe();
 
         @Cleanup
@@ -150,7 +153,10 @@ public class V5DeadLetterPolicyTest extends 
V5ClientBaseTest {
                 .subscriptionName("dlq-meta-sub")
                 .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
                         Duration.ofMillis(200), Duration.ofMillis(200)))
-                .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, 
null))
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(1)
+                        .deadLetterTopic(dlqTopic)
+                        .build())
                 .subscribe();
         @Cleanup
         QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
@@ -212,7 +218,10 @@ public class V5DeadLetterPolicyTest extends 
V5ClientBaseTest {
                 .subscriptionName("dlq-multi-sub")
                 .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
                         Duration.ofMillis(200), Duration.ofMillis(200)))
-                .deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, 
null))
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(1)
+                        .deadLetterTopic(dlqTopic)
+                        .build())
                 .subscribe();
         @Cleanup
         QueueConsumer<byte[]> dlqConsumer = 
v5Client.newQueueConsumer(Schema.bytes())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
index 7e816b0663e..1c84bf4cd92 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
@@ -78,7 +78,7 @@ public class V5ProducerBatchingTest extends V5ClientBaseTest {
 
     @Test
     public void testDefaultBatchingDeliversAllMessages() throws Exception {
-        produceAndVerify(BatchingPolicy.ofDefault(), 100, "default");
+        produceAndVerify(BatchingPolicy.builder().build(), 100, "default");
     }
 
     @Test
@@ -90,8 +90,11 @@ public class V5ProducerBatchingTest extends V5ClientBaseTest 
{
     public void testTightBatchingByDelay() throws Exception {
         // Small max-publish-delay forces batches to flush quickly; both ends 
still see
         // every message in order.
-        BatchingPolicy tight = BatchingPolicy.of(
-                Duration.ofMillis(5), 100, MemorySize.ofMegabytes(1));
+        BatchingPolicy tight = BatchingPolicy.builder()
+                .maxPublishDelay(Duration.ofMillis(5))
+                .maxMessages(100)
+                .maxSize(MemorySize.ofMegabytes(1))
+                .build();
         produceAndVerify(tight, 50, "tight-delay");
     }
 
@@ -99,8 +102,11 @@ public class V5ProducerBatchingTest extends 
V5ClientBaseTest {
     public void testBatchingWithSmallBatchSize() throws Exception {
         // Cap the batch at 5 messages — exercises the maxMessages branch of 
the batching
         // packer so a 50-message stream gets cut into 10 batches.
-        BatchingPolicy small = BatchingPolicy.of(
-                Duration.ofSeconds(1), 5, MemorySize.ofMegabytes(1));
+        BatchingPolicy small = BatchingPolicy.builder()
+                .maxPublishDelay(Duration.ofSeconds(1))
+                .maxMessages(5)
+                .maxSize(MemorySize.ofMegabytes(1))
+                .build();
         produceAndVerify(small, 50, "small-batch");
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
index 23ede822b65..6cdc197709f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -44,7 +44,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
     private PulsarClient newTxnClient() throws Exception {
         return track(PulsarClient.builder()
                 .serviceUrl(getBrokerServiceUrl())
-                .transactionPolicy(new 
TransactionPolicy(Duration.ofMinutes(1)))
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
                 .build());
     }
 
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
index eb7ab9a4ecd..bca04a2a596 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java
@@ -20,27 +20,55 @@ package org.apache.pulsar.client.api.v5.config;
 
 import java.time.Duration;
 import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
  * Backoff configuration for broker reconnection attempts.
  *
  * <p>The delay for attempt {@code n} is {@code min(initialInterval * 
multiplier^(n-1), maxInterval)}.
  *
- * @param initialInterval the delay before the first reconnection attempt
- * @param maxInterval     the maximum delay between reconnection attempts
- * @param multiplier      the multiplier applied after each attempt
+ * <p>Use {@link #fixed(Duration, Duration)} or {@link #exponential(Duration, 
Duration)} for
+ * the common cases, or {@link #builder()} to configure all knobs explicitly.
  */
-public record BackoffPolicy(
-        Duration initialInterval,
-        Duration maxInterval,
-        double multiplier
-) {
-    public BackoffPolicy {
+@EqualsAndHashCode
+@ToString
+public final class BackoffPolicy {
+
+    private final Duration initialInterval;
+    private final Duration maxInterval;
+    private final double multiplier;
+
+    private BackoffPolicy(Duration initialInterval, Duration maxInterval, 
double multiplier) {
         Objects.requireNonNull(initialInterval, "initialInterval must not be 
null");
         Objects.requireNonNull(maxInterval, "maxInterval must not be null");
         if (multiplier < 1.0) {
             throw new IllegalArgumentException("multiplier must be >= 1.0");
         }
+        this.initialInterval = initialInterval;
+        this.maxInterval = maxInterval;
+        this.multiplier = multiplier;
+    }
+
+    /**
+     * @return the delay before the first reconnection attempt
+     */
+    public Duration initialInterval() {
+        return initialInterval;
+    }
+
+    /**
+     * @return the maximum delay between reconnection attempts
+     */
+    public Duration maxInterval() {
+        return maxInterval;
+    }
+
+    /**
+     * @return the multiplier applied after each attempt
+     */
+    public double multiplier() {
+        return multiplier;
     }
 
     /**
@@ -66,14 +94,62 @@ public record BackoffPolicy(
     }
 
     /**
-     * Create an exponential backoff with a custom multiplier.
-     *
-     * @param initialInterval the delay before the first reconnection attempt
-     * @param maxInterval     the maximum delay between reconnection attempts
-     * @param multiplier      the multiplier applied after each attempt, must 
be &gt;= 1.0
-     * @return a {@link BackoffPolicy} with the specified parameters
+     * @return a new builder for constructing a {@link BackoffPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link BackoffPolicy}.
      */
-    public static BackoffPolicy exponential(Duration initialInterval, Duration 
maxInterval, double multiplier) {
-        return new BackoffPolicy(initialInterval, maxInterval, multiplier);
+    public static final class Builder {
+        private Duration initialInterval;
+        private Duration maxInterval;
+        private double multiplier = 2.0;
+
+        private Builder() {
+        }
+
+        /**
+         * Delay before the first reconnection attempt. Required.
+         *
+         * @param initialInterval the initial backoff delay
+         * @return this builder
+         */
+        public Builder initialInterval(Duration initialInterval) {
+            this.initialInterval = initialInterval;
+            return this;
+        }
+
+        /**
+         * Upper bound on the backoff delay. Required.
+         *
+         * @param maxInterval the maximum backoff delay
+         * @return this builder
+         */
+        public Builder maxInterval(Duration maxInterval) {
+            this.maxInterval = maxInterval;
+            return this;
+        }
+
+        /**
+         * Multiplier applied to the previous delay on each retry. Must be 
{@code >= 1.0}.
+         * Default is {@code 2.0} (exponential backoff). Use {@code 1.0} for 
fixed backoff.
+         *
+         * @param multiplier the per-attempt multiplier
+         * @return this builder
+         */
+        public Builder multiplier(double multiplier) {
+            this.multiplier = multiplier;
+            return this;
+        }
+
+        /**
+         * @return a new {@link BackoffPolicy} instance
+         */
+        public BackoffPolicy build() {
+            return new BackoffPolicy(initialInterval, maxInterval, multiplier);
+        }
     }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
index 13ef7cbc4bd..187ac4e8636 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BatchingPolicy.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.client.api.v5.config;
 
 import java.time.Duration;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
  * Configuration for producer message batching.
@@ -27,33 +30,68 @@ import java.time.Duration;
  * broker request to improve throughput. A batch is flushed when any of the
  * configured thresholds is reached.
  *
- * @param enabled        whether batching is enabled
- * @param maxPublishDelay maximum time to wait before flushing a batch
- * @param maxMessages    maximum number of messages in a single batch
- * @param maxSize       maximum size of a single batch
+ * <p>Construct via {@link #builder()}, or use {@link #ofDisabled()} to opt 
out.
  */
-public record BatchingPolicy(
-        boolean enabled,
-        Duration maxPublishDelay,
-        int maxMessages,
-        MemorySize maxSize
-) {
-    public BatchingPolicy {
+@EqualsAndHashCode
+@ToString
+public final class BatchingPolicy {
+
+    private static final Duration DEFAULT_MAX_PUBLISH_DELAY = 
Duration.ofMillis(1);
+    private static final int DEFAULT_MAX_MESSAGES = 1000;
+    private static final MemorySize DEFAULT_MAX_SIZE = 
MemorySize.ofKilobytes(128);
+
+    private static final BatchingPolicy DISABLED =
+            new BatchingPolicy(false, DEFAULT_MAX_PUBLISH_DELAY, 
DEFAULT_MAX_MESSAGES, DEFAULT_MAX_SIZE);
+
+    private final boolean enabled;
+    private final Duration maxPublishDelay;
+    private final int maxMessages;
+    private final MemorySize maxSize;
+
+    private BatchingPolicy(boolean enabled, Duration maxPublishDelay, int 
maxMessages, MemorySize maxSize) {
         if (maxPublishDelay == null) {
-            maxPublishDelay = Duration.ofMillis(1);
+            maxPublishDelay = DEFAULT_MAX_PUBLISH_DELAY;
         }
+        Objects.requireNonNull(maxSize, "maxSize must not be null");
         if (maxMessages < 0) {
             throw new IllegalArgumentException("maxMessages must be >= 0");
         }
         if (maxSize.bytes() < 0) {
             throw new IllegalArgumentException("maxBytes must be >= 0");
         }
+        this.enabled = enabled;
+        this.maxPublishDelay = maxPublishDelay;
+        this.maxMessages = maxMessages;
+        this.maxSize = maxSize;
     }
 
-    private static final BatchingPolicy DISABLED =
-            new BatchingPolicy(false, Duration.ofMillis(1), 1000, 
MemorySize.ofKilobytes(128));
-    private static final BatchingPolicy DEFAULT =
-            new BatchingPolicy(true, Duration.ofMillis(1), 1000, 
MemorySize.ofKilobytes(128));
+    /**
+     * @return whether batching is enabled
+     */
+    public boolean enabled() {
+        return enabled;
+    }
+
+    /**
+     * @return the maximum time to wait before flushing a batch
+     */
+    public Duration maxPublishDelay() {
+        return maxPublishDelay;
+    }
+
+    /**
+     * @return the maximum number of messages in a single batch
+     */
+    public int maxMessages() {
+        return maxMessages;
+    }
+
+    /**
+     * @return the maximum size of a single batch
+     */
+    public MemorySize maxSize() {
+        return maxSize;
+    }
 
     /**
      * Batching disabled.
@@ -65,23 +103,73 @@ public record BatchingPolicy(
     }
 
     /**
-     * Batching enabled with default thresholds (1ms delay, 1000 messages, 
128KB).
-     *
-     * @return a {@link BatchingPolicy} with default batching thresholds
+     * @return a new builder for constructing a {@link BatchingPolicy}
      */
-    public static BatchingPolicy ofDefault() {
-        return DEFAULT;
+    public static Builder builder() {
+        return new Builder();
     }
 
     /**
-     * Batching enabled with custom thresholds.
-     *
-     * @param maxPublishDelay the maximum time to wait before flushing a batch
-     * @param maxMessages     the maximum number of messages in a single batch
-     * @param maxSize         the maximum size of a single batch
-     * @return a {@link BatchingPolicy} with batching enabled and the 
specified thresholds
+     * Builder for {@link BatchingPolicy}.
      */
-    public static BatchingPolicy of(Duration maxPublishDelay, int maxMessages, 
MemorySize maxSize) {
-        return new BatchingPolicy(true, maxPublishDelay, maxMessages, maxSize);
+    public static final class Builder {
+        private boolean enabled = true;
+        private Duration maxPublishDelay = DEFAULT_MAX_PUBLISH_DELAY;
+        private int maxMessages = DEFAULT_MAX_MESSAGES;
+        private MemorySize maxSize = DEFAULT_MAX_SIZE;
+
+        private Builder() {
+        }
+
+        /**
+         * Whether batching is enabled. Default is {@code true}.
+         *
+         * @param enabled whether to batch outgoing messages
+         * @return this builder
+         */
+        public Builder enabled(boolean enabled) {
+            this.enabled = enabled;
+            return this;
+        }
+
+        /**
+         * Maximum time the producer waits before flushing a partial batch.
+         *
+         * @param maxPublishDelay the upper bound on per-message latency added 
by batching
+         * @return this builder
+         */
+        public Builder maxPublishDelay(Duration maxPublishDelay) {
+            this.maxPublishDelay = maxPublishDelay;
+            return this;
+        }
+
+        /**
+         * Maximum number of messages per batch.
+         *
+         * @param maxMessages the message-count flush threshold
+         * @return this builder
+         */
+        public Builder maxMessages(int maxMessages) {
+            this.maxMessages = maxMessages;
+            return this;
+        }
+
+        /**
+         * Maximum payload size per batch.
+         *
+         * @param maxSize the size flush threshold
+         * @return this builder
+         */
+        public Builder maxSize(MemorySize maxSize) {
+            this.maxSize = maxSize;
+            return this;
+        }
+
+        /**
+         * @return a new {@link BatchingPolicy} instance
+         */
+        public BatchingPolicy build() {
+            return new BatchingPolicy(enabled, maxPublishDelay, maxMessages, 
maxSize);
+        }
     }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
index ff32a287a36..e523a73328d 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ChunkingPolicy.java
@@ -18,14 +18,102 @@
  */
 package org.apache.pulsar.client.api.v5.config;
 
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 /**
  * Configuration for chunking large messages that exceed the broker's max 
message size.
  *
- * @param enabled   whether chunking is enabled
- * @param chunkSize maximum size of each chunk in bytes
+ * <p>When chunking is enabled, the producer splits a payload larger than the
+ * configured chunk size into smaller pieces that are reassembled by the 
consumer.
+ *
+ * <p>Use {@link #ofDisabled()} to opt out, or {@link #builder()} to enable 
with a
+ * specific chunk size.
  */
-public record ChunkingPolicy(
-        boolean enabled,
-        int chunkSize
-) {
+@EqualsAndHashCode
+@ToString
+public final class ChunkingPolicy {
+
+    private static final ChunkingPolicy DISABLED = new ChunkingPolicy(false, 
0);
+
+    private final boolean enabled;
+    private final int chunkSize;
+
+    private ChunkingPolicy(boolean enabled, int chunkSize) {
+        if (enabled && chunkSize <= 0) {
+            throw new IllegalArgumentException("chunkSize must be > 0 when 
chunking is enabled");
+        }
+        this.enabled = enabled;
+        this.chunkSize = chunkSize;
+    }
+
+    /**
+     * @return whether chunking is enabled
+     */
+    public boolean enabled() {
+        return enabled;
+    }
+
+    /**
+     * @return the maximum size of each chunk in bytes (only meaningful when 
{@link #enabled()})
+     */
+    public int chunkSize() {
+        return chunkSize;
+    }
+
+    /**
+     * Chunking disabled.
+     *
+     * @return a {@link ChunkingPolicy} with chunking disabled
+     */
+    public static ChunkingPolicy ofDisabled() {
+        return DISABLED;
+    }
+
+    /**
+     * @return a new builder for constructing a {@link ChunkingPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link ChunkingPolicy}.
+     */
+    public static final class Builder {
+        private boolean enabled = true;
+        private int chunkSize;
+
+        private Builder() {
+        }
+
+        /**
+         * Whether chunking is enabled. Default is {@code true}.
+         *
+         * @param enabled whether to chunk oversized payloads
+         * @return this builder
+         */
+        public Builder enabled(boolean enabled) {
+            this.enabled = enabled;
+            return this;
+        }
+
+        /**
+         * Maximum size of each chunk in bytes. Required when chunking is 
enabled.
+         *
+         * @param chunkSize the per-chunk size in bytes
+         * @return this builder
+         */
+        public Builder chunkSize(int chunkSize) {
+            this.chunkSize = chunkSize;
+            return this;
+        }
+
+        /**
+         * @return a new {@link ChunkingPolicy} instance
+         */
+        public ChunkingPolicy build() {
+            return new ChunkingPolicy(enabled, chunkSize);
+        }
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
index 79c318211b3..4e83e9c7a81 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/CompressionPolicy.java
@@ -18,12 +18,34 @@
  */
 package org.apache.pulsar.client.api.v5.config;
 
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 /**
  * Compression configuration for producer message payloads.
  *
- * @param type the compression codec to use
+ * <p>The dominant configuration is the codec, so {@link #of(CompressionType)} 
is the
+ * primary entry point. Use {@link #builder()} if you need to set additional 
knobs in
+ * the future.
  */
-public record CompressionPolicy(CompressionType type) {
+@EqualsAndHashCode
+@ToString
+public final class CompressionPolicy {
+
+    private final CompressionType type;
+
+    private CompressionPolicy(CompressionType type) {
+        Objects.requireNonNull(type, "type must not be null");
+        this.type = type;
+    }
+
+    /**
+     * @return the compression codec
+     */
+    public CompressionType type() {
+        return type;
+    }
 
     /**
      * No compression.
@@ -43,4 +65,39 @@ public record CompressionPolicy(CompressionType type) {
     public static CompressionPolicy of(CompressionType type) {
         return new CompressionPolicy(type);
     }
+
+    /**
+     * @return a new builder for constructing a {@link CompressionPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link CompressionPolicy}.
+     */
+    public static final class Builder {
+        private CompressionType type = CompressionType.NONE;
+
+        private Builder() {
+        }
+
+        /**
+         * Compression codec to use for message payloads.
+         *
+         * @param type the compression codec
+         * @return this builder
+         */
+        public Builder type(CompressionType type) {
+            this.type = type;
+            return this;
+        }
+
+        /**
+         * @return a new {@link CompressionPolicy} instance
+         */
+        public CompressionPolicy build() {
+            return new CompressionPolicy(type);
+        }
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
index b502dde7315..83601c5f780 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConnectionPolicy.java
@@ -20,35 +20,42 @@ package org.apache.pulsar.client.api.v5.config;
 
 import java.time.Duration;
 import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
  * Connection-level settings for the Pulsar client.
  *
  * <p>Groups TCP connection timeout, connection pool sizing, keep-alive, idle 
timeout,
  * TCP no-delay, I/O and callback threading, and proxy configuration.
+ *
+ * <p>Construct via {@link #builder()}.
  */
-public record ConnectionPolicy(
-        Duration connectionTimeout,
-        int connectionsPerBroker,
-        boolean enableTcpNoDelay,
-        Duration keepAliveInterval,
-        Duration connectionMaxIdleTime,
-        int ioThreads,
-        int callbackThreads,
-        String proxyServiceUrl,
-        ProxyProtocol proxyProtocol,
-        BackoffPolicy connectionBackoff
-) {
+@EqualsAndHashCode
+@ToString
+public final class ConnectionPolicy {
 
-    /**
-     * Create a connection policy with the given parameters.
-     *
-     * @throws NullPointerException if {@code connectionTimeout}, {@code 
keepAliveInterval},
-     *         {@code connectionMaxIdleTime}, or {@code connectionBackoff} is 
null
-     * @throws IllegalArgumentException if {@code connectionsPerBroker}, 
{@code ioThreads},
-     *         or {@code callbackThreads} is less than 1
-     */
-    public ConnectionPolicy {
+    private final Duration connectionTimeout;
+    private final int connectionsPerBroker;
+    private final boolean enableTcpNoDelay;
+    private final Duration keepAliveInterval;
+    private final Duration connectionMaxIdleTime;
+    private final int ioThreads;
+    private final int callbackThreads;
+    private final String proxyServiceUrl;
+    private final ProxyProtocol proxyProtocol;
+    private final BackoffPolicy connectionBackoff;
+
+    private ConnectionPolicy(Duration connectionTimeout,
+                             int connectionsPerBroker,
+                             boolean enableTcpNoDelay,
+                             Duration keepAliveInterval,
+                             Duration connectionMaxIdleTime,
+                             int ioThreads,
+                             int callbackThreads,
+                             String proxyServiceUrl,
+                             ProxyProtocol proxyProtocol,
+                             BackoffPolicy connectionBackoff) {
         Objects.requireNonNull(connectionTimeout, "connectionTimeout must not 
be null");
         Objects.requireNonNull(keepAliveInterval, "keepAliveInterval must not 
be null");
         Objects.requireNonNull(connectionMaxIdleTime, "connectionMaxIdleTime 
must not be null");
@@ -62,12 +69,90 @@ public record ConnectionPolicy(
         if (callbackThreads < 1) {
             throw new IllegalArgumentException("callbackThreads must be >= 1");
         }
+        this.connectionTimeout = connectionTimeout;
+        this.connectionsPerBroker = connectionsPerBroker;
+        this.enableTcpNoDelay = enableTcpNoDelay;
+        this.keepAliveInterval = keepAliveInterval;
+        this.connectionMaxIdleTime = connectionMaxIdleTime;
+        this.ioThreads = ioThreads;
+        this.callbackThreads = callbackThreads;
+        this.proxyServiceUrl = proxyServiceUrl;
+        this.proxyProtocol = proxyProtocol;
+        this.connectionBackoff = connectionBackoff;
+    }
+
+    /**
+     * @return the maximum duration to wait for a TCP connection to a broker
+     */
+    public Duration connectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * @return the number of TCP connections maintained per broker
+     */
+    public int connectionsPerBroker() {
+        return connectionsPerBroker;
+    }
+
+    /**
+     * @return whether TCP no-delay (Nagle disabled) is enabled
+     */
+    public boolean enableTcpNoDelay() {
+        return enableTcpNoDelay;
+    }
+
+    /**
+     * @return the interval between TCP keep-alive probes
+     */
+    public Duration keepAliveInterval() {
+        return keepAliveInterval;
+    }
+
+    /**
+     * @return the maximum idle duration before a connection is closed
+     */
+    public Duration connectionMaxIdleTime() {
+        return connectionMaxIdleTime;
+    }
+
+    /**
+     * @return the number of I/O threads
+     */
+    public int ioThreads() {
+        return ioThreads;
+    }
+
+    /**
+     * @return the number of callback threads
+     */
+    public int callbackThreads() {
+        return callbackThreads;
+    }
+
+    /**
+     * @return the proxy service URL, or {@code null} if no proxy is configured
+     */
+    public String proxyServiceUrl() {
+        return proxyServiceUrl;
+    }
+
+    /**
+     * @return the proxy protocol, or {@code null} if no proxy is configured
+     */
+    public ProxyProtocol proxyProtocol() {
+        return proxyProtocol;
+    }
+
+    /**
+     * @return the broker-reconnection backoff policy
+     */
+    public BackoffPolicy connectionBackoff() {
+        return connectionBackoff;
     }
 
     /**
-     * Create a builder for constructing a {@link ConnectionPolicy}.
-     *
-     * @return a new builder with sensible defaults
+     * @return a new builder for constructing a {@link ConnectionPolicy}
      */
     public static Builder builder() {
         return new Builder();
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
index cb4f34ad4a4..4b610b58770 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/DeadLetterPolicy.java
@@ -18,36 +18,137 @@
  */
 package org.apache.pulsar.client.api.v5.config;
 
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 /**
  * Configuration for the dead letter queue mechanism.
  *
  * <p>When a message has been redelivered more than {@code maxRedeliverCount} 
times,
  * it is moved to the dead letter topic instead of being redelivered again.
  *
- * @param maxRedeliverCount     maximum number of redelivery attempts before 
sending to the dead letter topic
- * @param retryLetterTopic      custom topic for retry messages (nullable, 
auto-generated if null)
- * @param deadLetterTopic       custom topic for dead letter messages 
(nullable, auto-generated if null)
- * @param initialSubscriptionName subscription name to create on the dead 
letter topic (nullable)
+ * <p>Construct via {@link #builder()}.
  */
-public record DeadLetterPolicy(
-        int maxRedeliverCount,
-        String retryLetterTopic,
-        String deadLetterTopic,
-        String initialSubscriptionName
-) {
-    public DeadLetterPolicy {
+@EqualsAndHashCode
+@ToString
+public final class DeadLetterPolicy {
+
+    private final int maxRedeliverCount;
+    private final String retryLetterTopic;
+    private final String deadLetterTopic;
+    private final String initialSubscriptionName;
+
+    private DeadLetterPolicy(int maxRedeliverCount, String retryLetterTopic,
+                             String deadLetterTopic, String 
initialSubscriptionName) {
         if (maxRedeliverCount < 0) {
             throw new IllegalArgumentException("maxRedeliverCount must be >= 
0");
         }
+        this.maxRedeliverCount = maxRedeliverCount;
+        this.retryLetterTopic = retryLetterTopic;
+        this.deadLetterTopic = deadLetterTopic;
+        this.initialSubscriptionName = initialSubscriptionName;
+    }
+
+    /**
+     * @return the maximum number of redelivery attempts before sending to the 
dead letter topic
+     */
+    public int maxRedeliverCount() {
+        return maxRedeliverCount;
+    }
+
+    /**
+     * @return the custom retry letter topic, or {@code null} for the 
auto-generated default
+     */
+    public String retryLetterTopic() {
+        return retryLetterTopic;
     }
 
     /**
-     * Create a dead letter policy with just a max redeliver count, using 
default topic names.
-     *
-     * @param maxRedeliverCount the maximum number of redelivery attempts 
before sending to the dead letter topic
-     * @return a {@link DeadLetterPolicy} with auto-generated topic names and 
no initial subscription
+     * @return the custom dead letter topic, or {@code null} for the 
auto-generated default
      */
-    public static DeadLetterPolicy of(int maxRedeliverCount) {
-        return new DeadLetterPolicy(maxRedeliverCount, null, null, null);
+    public String deadLetterTopic() {
+        return deadLetterTopic;
+    }
+
+    /**
+     * @return the subscription name to create on the dead letter topic, or 
{@code null} if none
+     */
+    public String initialSubscriptionName() {
+        return initialSubscriptionName;
+    }
+
+    /**
+     * @return a new builder for constructing a {@link DeadLetterPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link DeadLetterPolicy}.
+     */
+    public static final class Builder {
+        private int maxRedeliverCount;
+        private String retryLetterTopic;
+        private String deadLetterTopic;
+        private String initialSubscriptionName;
+
+        private Builder() {
+        }
+
+        /**
+         * Maximum number of redelivery attempts before sending to the dead 
letter topic.
+         * Required.
+         *
+         * @param maxRedeliverCount the redelivery threshold
+         * @return this builder
+         */
+        public Builder maxRedeliverCount(int maxRedeliverCount) {
+            this.maxRedeliverCount = maxRedeliverCount;
+            return this;
+        }
+
+        /**
+         * Custom retry letter topic. {@code null} (default) auto-generates 
the name.
+         *
+         * @param retryLetterTopic the retry letter topic name
+         * @return this builder
+         */
+        public Builder retryLetterTopic(String retryLetterTopic) {
+            this.retryLetterTopic = retryLetterTopic;
+            return this;
+        }
+
+        /**
+         * Custom dead letter topic. {@code null} (default) auto-generates the 
name as
+         * {@code <source>-DLQ}.
+         *
+         * @param deadLetterTopic the dead letter topic name
+         * @return this builder
+         */
+        public Builder deadLetterTopic(String deadLetterTopic) {
+            this.deadLetterTopic = deadLetterTopic;
+            return this;
+        }
+
+        /**
+         * Subscription name to create on the dead letter topic, ensuring 
messages
+         * forwarded to the DLQ are retained until consumed.
+         *
+         * @param initialSubscriptionName the DLQ initial subscription
+         * @return this builder
+         */
+        public Builder initialSubscriptionName(String initialSubscriptionName) 
{
+            this.initialSubscriptionName = initialSubscriptionName;
+            return this;
+        }
+
+        /**
+         * @return a new {@link DeadLetterPolicy} instance
+         */
+        public DeadLetterPolicy build() {
+            return new DeadLetterPolicy(maxRedeliverCount, retryLetterTopic,
+                    deadLetterTopic, initialSubscriptionName);
+        }
     }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
index d443274c6e7..c196a8f20fb 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
@@ -20,25 +20,31 @@ package org.apache.pulsar.client.api.v5.config;
 
 import java.util.List;
 import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import org.apache.pulsar.client.api.v5.auth.CryptoFailureAction;
 import org.apache.pulsar.client.api.v5.auth.CryptoKeyReader;
 
 /**
  * End-to-end encryption configuration for producers and consumers.
  *
- * <p>For producers, supply a {@link CryptoKeyReader} and one or more 
encryption key names.
- * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link 
CryptoFailureAction}.
+ * <p>For producers, supply a {@link CryptoKeyReader} and one or more 
encryption key names —
+ * use {@link #forProducer(CryptoKeyReader, String...)} as the typical entry 
point.
+ * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link 
CryptoFailureAction} —
+ * use {@link #forConsumer(CryptoKeyReader, CryptoFailureAction)}.
  *
- * @param keyReader     the crypto key reader for loading 
encryption/decryption keys
- * @param keyNames      encryption key names (producer-side; empty list for 
consumer/reader)
- * @param failureAction action to take when encryption or decryption fails
+ * <p>{@link #builder()} is available for callers that need to tune both sides 
explicitly.
  */
-public record EncryptionPolicy(
-        CryptoKeyReader keyReader,
-        List<String> keyNames,
-        CryptoFailureAction failureAction
-) {
-    public EncryptionPolicy {
+@EqualsAndHashCode
+@ToString
+public final class EncryptionPolicy {
+
+    private final CryptoKeyReader keyReader;
+    private final List<String> keyNames;
+    private final CryptoFailureAction failureAction;
+
+    private EncryptionPolicy(CryptoKeyReader keyReader, List<String> keyNames,
+                             CryptoFailureAction failureAction) {
         Objects.requireNonNull(keyReader, "keyReader must not be null");
         if (keyNames == null) {
             keyNames = List.of();
@@ -46,6 +52,30 @@ public record EncryptionPolicy(
         if (failureAction == null) {
             failureAction = CryptoFailureAction.FAIL;
         }
+        this.keyReader = keyReader;
+        this.keyNames = List.copyOf(keyNames);
+        this.failureAction = failureAction;
+    }
+
+    /**
+     * @return the crypto key reader for loading encryption/decryption keys
+     */
+    public CryptoKeyReader keyReader() {
+        return keyReader;
+    }
+
+    /**
+     * @return the producer-side encryption key names (empty list for 
consumer/reader)
+     */
+    public List<String> keyNames() {
+        return keyNames;
+    }
+
+    /**
+     * @return the action to take when encryption or decryption fails
+     */
+    public CryptoFailureAction failureAction() {
+        return failureAction;
     }
 
     /**
@@ -69,4 +99,64 @@ public record EncryptionPolicy(
     public static EncryptionPolicy forConsumer(CryptoKeyReader keyReader, 
CryptoFailureAction failureAction) {
         return new EncryptionPolicy(keyReader, List.of(), failureAction);
     }
+
+    /**
+     * @return a new builder for constructing an {@link EncryptionPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link EncryptionPolicy}.
+     */
+    public static final class Builder {
+        private CryptoKeyReader keyReader;
+        private List<String> keyNames = List.of();
+        private CryptoFailureAction failureAction = CryptoFailureAction.FAIL;
+
+        private Builder() {
+        }
+
+        /**
+         * Crypto key reader used to load encryption/decryption keys. Required.
+         *
+         * @param keyReader the key reader
+         * @return this builder
+         */
+        public Builder keyReader(CryptoKeyReader keyReader) {
+            this.keyReader = keyReader;
+            return this;
+        }
+
+        /**
+         * Producer-side encryption key names. Leave empty (default) for 
consumer-side use.
+         *
+         * @param keyNames the key names
+         * @return this builder
+         */
+        public Builder keyNames(String... keyNames) {
+            this.keyNames = List.of(keyNames);
+            return this;
+        }
+
+        /**
+         * Action to take when encryption (producer) or decryption (consumer) 
fails.
+         * Default is {@link CryptoFailureAction#FAIL}.
+         *
+         * @param failureAction the failure action
+         * @return this builder
+         */
+        public Builder failureAction(CryptoFailureAction failureAction) {
+            this.failureAction = failureAction;
+            return this;
+        }
+
+        /**
+         * @return a new {@link EncryptionPolicy} instance
+         */
+        public EncryptionPolicy build() {
+            return new EncryptionPolicy(keyReader, keyNames, failureAction);
+        }
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
index 0cda6c3ee04..0c71f14590e 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProcessingTimeoutPolicy.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api.v5.config;
 
 import java.time.Duration;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
  * Optional safety net for slow / stalled queue consumers: if the application 
doesn't
@@ -36,37 +38,101 @@ import java.time.Duration;
  * when the application's processing time is bounded and you want stalled 
deliveries to
  * be reattempted automatically.
  *
- * @param timeout            how long the client waits for the application to 
ack a
- *                           delivery before requesting redelivery. {@link 
Duration#ZERO}
- *                           disables.
- * @param redeliveryBackoff  optional backoff applied between redeliveries. 
May be
- *                           {@code null} for the default (no extra delay).
+ * <p>Use {@link #of(Duration)} for the common case (no extra backoff), or
+ * {@link #builder()} to also configure {@code redeliveryBackoff}.
  */
-public record ProcessingTimeoutPolicy(
-        Duration timeout,
-        BackoffPolicy redeliveryBackoff
-) {
-    public ProcessingTimeoutPolicy {
+@EqualsAndHashCode
+@ToString
+public final class ProcessingTimeoutPolicy {
+
+    private final Duration timeout;
+    private final BackoffPolicy redeliveryBackoff;
+
+    private ProcessingTimeoutPolicy(Duration timeout, BackoffPolicy 
redeliveryBackoff) {
         if (timeout == null) {
             throw new IllegalArgumentException("timeout must not be null");
         }
         if (timeout.isNegative()) {
             throw new IllegalArgumentException("timeout must not be negative");
         }
+        this.timeout = timeout;
+        this.redeliveryBackoff = redeliveryBackoff;
+    }
+
+    /**
+     * @return how long the client waits for the application to ack a delivery 
before
+     *         requesting redelivery; {@link Duration#ZERO} disables
+     */
+    public Duration timeout() {
+        return timeout;
+    }
+
+    /**
+     * @return optional backoff applied between redeliveries, or {@code null} 
for the
+     *         default (no extra delay)
+     */
+    public BackoffPolicy redeliveryBackoff() {
+        return redeliveryBackoff;
     }
 
     /**
      * Create a policy with just a timeout — the broker uses its default 
redelivery
      * cadence (no extra backoff between retries).
+     *
+     * @param timeout the processing-timeout duration
+     * @return a {@link ProcessingTimeoutPolicy} with no extra redelivery 
backoff
      */
     public static ProcessingTimeoutPolicy of(Duration timeout) {
         return new ProcessingTimeoutPolicy(timeout, null);
     }
 
     /**
-     * Create a policy with a timeout and an explicit redelivery backoff.
+     * @return a new builder for constructing a {@link ProcessingTimeoutPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link ProcessingTimeoutPolicy}.
      */
-    public static ProcessingTimeoutPolicy of(Duration timeout, BackoffPolicy 
redeliveryBackoff) {
-        return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+    public static final class Builder {
+        private Duration timeout;
+        private BackoffPolicy redeliveryBackoff;
+
+        private Builder() {
+        }
+
+        /**
+         * Processing-timeout duration — how long the client waits for the 
application
+         * to ack before asking the broker to redeliver. {@link Duration#ZERO} 
disables.
+         * Required.
+         *
+         * @param timeout the processing timeout
+         * @return this builder
+         */
+        public Builder timeout(Duration timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        /**
+         * Optional backoff applied between redeliveries. {@code null} 
(default) means
+         * "redeliver immediately on the next sweep".
+         *
+         * @param redeliveryBackoff the backoff policy
+         * @return this builder
+         */
+        public Builder redeliveryBackoff(BackoffPolicy redeliveryBackoff) {
+            this.redeliveryBackoff = redeliveryBackoff;
+            return this;
+        }
+
+        /**
+         * @return a new {@link ProcessingTimeoutPolicy} instance
+         */
+        public ProcessingTimeoutPolicy build() {
+            return new ProcessingTimeoutPolicy(timeout, redeliveryBackoff);
+        }
     }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
index d47b8f71f73..ebd2e8ca196 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TlsPolicy.java
@@ -18,42 +18,66 @@
  */
 package org.apache.pulsar.client.api.v5.config;
 
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 /**
  * TLS configuration for the Pulsar client connection.
  *
- * @param trustCertsFilePath          path to the trusted CA certificate file 
(PEM format)
- * @param keyFilePath                 path to the client private key file (PEM 
format), or {@code null}
- * @param certificateFilePath         path to the client certificate file (PEM 
format), or {@code null}
- * @param allowInsecureConnection     whether to allow connecting to brokers 
with untrusted certificates
- * @param enableHostnameVerification  whether to verify the broker hostname 
against the certificate
+ * <p>Construct via {@link #builder()}, or use {@link #ofInsecure()} for 
development.
  */
-public record TlsPolicy(
-        String trustCertsFilePath,
-        String keyFilePath,
-        String certificateFilePath,
-        boolean allowInsecureConnection,
-        boolean enableHostnameVerification
-) {
+@EqualsAndHashCode
+@ToString
+public final class TlsPolicy {
+
+    private final String trustCertsFilePath;
+    private final String keyFilePath;
+    private final String certificateFilePath;
+    private final boolean allowInsecureConnection;
+    private final boolean enableHostnameVerification;
+
+    private TlsPolicy(String trustCertsFilePath, String keyFilePath, String 
certificateFilePath,
+                      boolean allowInsecureConnection, boolean 
enableHostnameVerification) {
+        this.trustCertsFilePath = trustCertsFilePath;
+        this.keyFilePath = keyFilePath;
+        this.certificateFilePath = certificateFilePath;
+        this.allowInsecureConnection = allowInsecureConnection;
+        this.enableHostnameVerification = enableHostnameVerification;
+    }
+
     /**
-     * Create a TLS policy that trusts the given CA certificate and verifies 
hostnames.
-     *
-     * @param trustCertsFilePath the path to the trusted CA certificate file 
(PEM format)
-     * @return a {@link TlsPolicy} with hostname verification enabled and 
insecure connections disabled
+     * @return path to the trusted CA certificate file (PEM), or {@code null} 
when not set
      */
-    public static TlsPolicy of(String trustCertsFilePath) {
-        return new TlsPolicy(trustCertsFilePath, null, null, false, true);
+    public String trustCertsFilePath() {
+        return trustCertsFilePath;
     }
 
     /**
-     * Create a TLS policy with mutual TLS (mTLS) authentication.
-     *
-     * @param trustCertsFilePath the path to the trusted CA certificate file 
(PEM format)
-     * @param keyFilePath        the path to the client private key file (PEM 
format)
-     * @param certificateFilePath the path to the client certificate file (PEM 
format)
-     * @return a {@link TlsPolicy} configured for mutual TLS with hostname 
verification enabled
+     * @return path to the client private key file (PEM), or {@code null} when 
not using mTLS
+     */
+    public String keyFilePath() {
+        return keyFilePath;
+    }
+
+    /**
+     * @return path to the client certificate file (PEM), or {@code null} when 
not using mTLS
+     */
+    public String certificateFilePath() {
+        return certificateFilePath;
+    }
+
+    /**
+     * @return whether connections to brokers with untrusted certificates are 
allowed
+     */
+    public boolean allowInsecureConnection() {
+        return allowInsecureConnection;
+    }
+
+    /**
+     * @return whether the broker hostname is verified against the certificate
      */
-    public static TlsPolicy ofMutualTls(String trustCertsFilePath, String 
keyFilePath, String certificateFilePath) {
-        return new TlsPolicy(trustCertsFilePath, keyFilePath, 
certificateFilePath, false, true);
+    public boolean enableHostnameVerification() {
+        return enableHostnameVerification;
     }
 
     /**
@@ -64,4 +88,90 @@ public record TlsPolicy(
     public static TlsPolicy ofInsecure() {
         return new TlsPolicy(null, null, null, true, false);
     }
+
+    /**
+     * @return a new builder for constructing a {@link TlsPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link TlsPolicy}.
+     */
+    public static final class Builder {
+        private String trustCertsFilePath;
+        private String keyFilePath;
+        private String certificateFilePath;
+        private boolean allowInsecureConnection = false;
+        private boolean enableHostnameVerification = true;
+
+        private Builder() {
+        }
+
+        /**
+         * Path to the trusted CA certificate file (PEM format).
+         *
+         * @param trustCertsFilePath the trust store path
+         * @return this builder
+         */
+        public Builder trustCertsFilePath(String trustCertsFilePath) {
+            this.trustCertsFilePath = trustCertsFilePath;
+            return this;
+        }
+
+        /**
+         * Path to the client private key file (PEM format) for mutual TLS.
+         *
+         * @param keyFilePath the client key path
+         * @return this builder
+         */
+        public Builder keyFilePath(String keyFilePath) {
+            this.keyFilePath = keyFilePath;
+            return this;
+        }
+
+        /**
+         * Path to the client certificate file (PEM format) for mutual TLS.
+         *
+         * @param certificateFilePath the client certificate path
+         * @return this builder
+         */
+        public Builder certificateFilePath(String certificateFilePath) {
+            this.certificateFilePath = certificateFilePath;
+            return this;
+        }
+
+        /**
+         * Whether to allow connecting to brokers with untrusted certificates.
+         * Default {@code false}. Enable only for development.
+         *
+         * @param allowInsecureConnection whether to skip certificate 
validation
+         * @return this builder
+         */
+        public Builder allowInsecureConnection(boolean 
allowInsecureConnection) {
+            this.allowInsecureConnection = allowInsecureConnection;
+            return this;
+        }
+
+        /**
+         * Whether to verify the broker hostname against the certificate. 
Default
+         * {@code true}.
+         *
+         * @param enableHostnameVerification whether to verify the hostname
+         * @return this builder
+         */
+        public Builder enableHostnameVerification(boolean 
enableHostnameVerification) {
+            this.enableHostnameVerification = enableHostnameVerification;
+            return this;
+        }
+
+        /**
+         * @return a new {@link TlsPolicy} instance
+         */
+        public TlsPolicy build() {
+            return new TlsPolicy(trustCertsFilePath, keyFilePath, 
certificateFilePath,
+                    allowInsecureConnection, enableHostnameVerification);
+        }
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
index 228bdeed020..8787d68db17 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/TransactionPolicy.java
@@ -19,12 +19,70 @@
 package org.apache.pulsar.client.api.v5.config;
 
 import java.time.Duration;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 /**
  * Transaction configuration for the Pulsar client.
  *
- * @param timeout transaction timeout. If the transaction is not committed or 
aborted within this duration, it will
- *                be automatically aborted by the broker.
+ * <p>Construct via {@link #builder()}.
  */
-public record TransactionPolicy(Duration timeout) {
+@EqualsAndHashCode
+@ToString
+public final class TransactionPolicy {
+
+    private final Duration timeout;
+
+    private TransactionPolicy(Duration timeout) {
+        Objects.requireNonNull(timeout, "timeout must not be null");
+        if (timeout.isNegative() || timeout.isZero()) {
+            throw new IllegalArgumentException("timeout must be > 0");
+        }
+        this.timeout = timeout;
+    }
+
+    /**
+     * @return transaction timeout — if the transaction is not committed or 
aborted within this duration,
+     *         the broker automatically aborts it
+     */
+    public Duration timeout() {
+        return timeout;
+    }
+
+    /**
+     * @return a new builder for constructing a {@link TransactionPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link TransactionPolicy}.
+     */
+    public static final class Builder {
+        private Duration timeout;
+
+        private Builder() {
+        }
+
+        /**
+         * Transaction timeout. Required. The broker auto-aborts transactions 
that
+         * neither commit nor abort within this duration.
+         *
+         * @param timeout the transaction timeout
+         * @return this builder
+         */
+        public Builder timeout(Duration timeout) {
+            this.timeout = timeout;
+            return this;
+        }
+
+        /**
+         * @return a new {@link TransactionPolicy} instance
+         */
+        public TransactionPolicy build() {
+            return new TransactionPolicy(timeout);
+        }
+    }
 }
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 582a41a0574..f05cfbd4a21 100644
--- 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -65,7 +65,7 @@ public class Examples {
         try (var client = PulsarClient.builder()
                 .serviceUrl("pulsar+ssl://pulsar.example.com:6651")
                 .authentication(AuthenticationFactory.token("eyJhbGci..."))
-                .tlsPolicy(TlsPolicy.of("/etc/pulsar/ca.pem"))
+                
.tlsPolicy(TlsPolicy.builder().trustCertsFilePath("/etc/pulsar/ca.pem").build())
                 .operationTimeout(Duration.ofSeconds(30))
                 .connectionPolicy(ConnectionPolicy.builder()
                         .connectionTimeout(Duration.ofSeconds(10))
@@ -107,8 +107,11 @@ public class Examples {
         try (var producer = 
client.newProducer(Schema.json(SensorReading.class))
                 .topic("sensor-data")
                 .compressionPolicy(CompressionPolicy.of(CompressionType.ZSTD))
-                .batchingPolicy(BatchingPolicy.of(
-                        Duration.ofMillis(10), 5000, 
MemorySize.ofMegabytes(1)))
+                .batchingPolicy(BatchingPolicy.builder()
+                        .maxPublishDelay(Duration.ofMillis(10))
+                        .maxMessages(5000)
+                        .maxSize(MemorySize.ofMegabytes(1))
+                        .build())
                 .create()) {
 
             for (int i = 0; i < 100_000; i++) {
@@ -281,7 +284,7 @@ public class Examples {
                 
.processingTimeout(ProcessingTimeoutPolicy.of(Duration.ofSeconds(30)))
                 .negativeAckRedeliveryBackoff(
                         BackoffPolicy.exponential(Duration.ofSeconds(1), 
Duration.ofMinutes(5)))
-                .deadLetterPolicy(DeadLetterPolicy.of(5))
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(5).build())
                 .subscribe()) {
 
             while (true) {

Reply via email to