This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3bdec20267 [feat] PIP-468: V5 end-to-end encryption API redesign 
(#25682)
b3bdec20267 is described below

commit b3bdec2026749ef55d4edc1de596d598a3d13977
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 5 21:47:47 2026 -0700

    [feat] PIP-468: V5 end-to-end encryption API redesign (#25682)
---
 .../pulsar/client/api/v5/V5EncryptionTest.java     | 226 +++++++++++++++++++++
 .../client/api/v5/CheckpointConsumerBuilder.java   |   6 +-
 .../pulsar/client/api/v5/ProducerBuilder.java      |   8 +-
 .../pulsar/client/api/v5/QueueConsumerBuilder.java |   6 +-
 .../client/api/v5/StreamConsumerBuilder.java       |   6 +-
 ...eader.java => ConsumerCryptoFailureAction.java} |  33 +--
 ...cryptionKeyInfo.java => CryptoKeyProvider.java} |  21 +-
 .../pulsar/client/api/v5/auth/EncryptionKey.java   |  81 ++++++++
 .../client/api/v5/auth/PemFileKeyProvider.java     | 139 +++++++++++++
 .../client/api/v5/auth/PrivateKeyProvider.java     |  50 +++++
 ...ction.java => ProducerCryptoFailureAction.java} |  19 +-
 ...CryptoKeyReader.java => PublicKeyProvider.java} |  32 ++-
 .../pulsar/client/api/v5/auth/package-info.java    |   4 +-
 .../api/v5/config/ConsumerEncryptionPolicy.java    | 117 +++++++++++
 .../client/api/v5/config/EncryptionPolicy.java     | 162 ---------------
 .../api/v5/config/ProducerEncryptionPolicy.java    | 149 ++++++++++++++
 .../org/apache/pulsar/client/api/v5/Examples.java  |  41 ++++
 .../client/api/v5/auth/PemFileKeyProviderTest.java | 120 +++++++++++
 .../client/api/v5/config/EncryptionPolicyTest.java | 134 ++++++++++++
 .../impl/v5/CheckpointConsumerBuilderV5.java       |   6 +-
 .../client/impl/v5/CryptoKeyReaderAdapter.java     |  49 ++++-
 .../pulsar/client/impl/v5/ProducerBuilderV5.java   |  13 +-
 .../client/impl/v5/QueueConsumerBuilderV5.java     |   8 +-
 .../client/impl/v5/StreamConsumerBuilderV5.java    |   8 +-
 24 files changed, 1181 insertions(+), 257 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java
new file mode 100644
index 00000000000..59ac067d353
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5EncryptionTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for V5 message encryption: produce → broker → consume
+ * round-trip, with payloads encrypted on the producer side and decrypted on 
the
+ * consumer side. Reuses the test PEM keys under {@code certificate/} that the 
v4
+ * tests already use.
+ *
+ * <p>Wiring under test:
+ * <ul>
+ *   <li>{@link PemFileKeyProvider} loads PEM bytes from disk on each 
side.</li>
+ *   <li>{@link ProducerEncryptionPolicy} / {@link ConsumerEncryptionPolicy} 
carry
+ *       the providers + key names + failure actions through the V5 
builders.</li>
+ *   <li>{@link org.apache.pulsar.client.impl.v5.CryptoKeyReaderAdapter} 
bridges
+ *       to the v4 {@code ConsumerImpl} / {@code ProducerImpl} crypto 
paths.</li>
+ * </ul>
+ */
+public class V5EncryptionTest extends V5ClientBaseTest {
+
+    private static final String KEY_NAME = "client-rsa";
+    private static final Path PUB_KEY =
+            
Path.of("./src/test/resources/certificate/public-key.client-rsa.pem");
+    private static final Path PRIV_KEY =
+            
Path.of("./src/test/resources/certificate/private-key.client-rsa.pem");
+
+    private static PemFileKeyProvider producerKeys() {
+        return PemFileKeyProvider.builder()
+                .publicKey(KEY_NAME, PUB_KEY)
+                .build();
+    }
+
+    private static PemFileKeyProvider consumerKeys() {
+        return PemFileKeyProvider.builder()
+                .privateKey(KEY_NAME, PRIV_KEY)
+                .build();
+    }
+
+    private static ProducerEncryptionPolicy producerPolicy() {
+        return ProducerEncryptionPolicy.builder()
+                .publicKeyProvider(producerKeys())
+                .keyName(KEY_NAME)
+                .build();
+    }
+
+    private static ConsumerEncryptionPolicy consumerPolicy() {
+        return ConsumerEncryptionPolicy.builder()
+                .privateKeyProvider(consumerKeys())
+                .failureAction(ConsumerCryptoFailureAction.FAIL)
+                .build();
+    }
+
+    /** Single-segment round trip: producer encrypts, consumer decrypts, 
payload matches. */
+    @Test
+    public void testProducerConsumerRoundTrip() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .encryptionPolicy(producerPolicy())
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("crypto-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .encryptionPolicy(consumerPolicy())
+                .subscribe();
+
+        producer.newMessage().value("hello-encrypted").send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "consumer must receive the encrypted-then-decrypted 
message");
+        assertEquals(msg.value(), "hello-encrypted");
+        consumer.acknowledge(msg.id());
+    }
+
+    /**
+     * Multi-segment scalable topic: messages spread across segments by key, 
each
+     * segment's per-segment v4 producer/consumer carries the same crypto 
config,
+     * so every message decrypts correctly regardless of which segment it 
landed on.
+     */
+    @Test
+    public void testEncryptionAcrossMultipleSegments() throws Exception {
+        String topic = newScalableTopic(3);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .encryptionPolicy(producerPolicy())
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("crypto-multi-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .encryptionPolicy(consumerPolicy())
+                .subscribe();
+
+        int n = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String value = "msg-" + i;
+            producer.newMessage().key("k-" + i).value(value).send();
+            sent.add(value);
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected message #" + (i + 1));
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "every encrypted message must decrypt to 
its original value");
+    }
+
+    /**
+     * Consumer with {@link ConsumerCryptoFailureAction#CONSUME} and no
+     * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} 
configured
+     * sees the still-encrypted payload, demonstrating the "I don't decrypt; 
just
+     * give me the bytes" mode.
+     *
+     * <p>Batching disabled on the producer: v4 drops batched encrypted 
messages
+     * even under CONSUME because it can't reframe a batch envelope it can't 
open.
+     */
+    @Test
+    public void testConsumerWithoutProviderAndConsumeAction() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .batchingPolicy(BatchingPolicy.ofDisabled())
+                .encryptionPolicy(producerPolicy())
+                .create();
+
+        @Cleanup
+        QueueConsumer<byte[]> consumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .topic(topic)
+                .subscriptionName("crypto-consume-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+                        .failureAction(ConsumerCryptoFailureAction.CONSUME)
+                        .build())
+                .subscribe();
+
+        producer.newMessage().value("plaintext-marker").send();
+
+        Message<byte[]> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "CONSUME must deliver the message even without a 
private key");
+        // Payload is still encrypted — must not contain the plaintext marker.
+        String body = new String(msg.value());
+        assertTrue(!body.contains("plaintext-marker"),
+                "payload should still be encrypted, got: " + body);
+        consumer.acknowledge(msg.id());
+    }
+
+    /**
+     * Consumer with {@link ConsumerCryptoFailureAction#DISCARD} and no 
provider
+     * silently drops undecryptable messages (cursor advances) — the 
application
+     * never sees them.
+     */
+    @Test
+    public void testConsumerWithoutProviderAndDiscardAction() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .batchingPolicy(BatchingPolicy.ofDisabled())
+                .encryptionPolicy(producerPolicy())
+                .create();
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("crypto-discard-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+                        .failureAction(ConsumerCryptoFailureAction.DISCARD)
+                        .build())
+                .subscribe();
+
+        producer.newMessage().value("classified").send();
+
+        Message<String> msg = consumer.receive(Duration.ofMillis(500));
+        assertNull(msg, "DISCARD must drop the undecryptable message before 
delivery");
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index e9c6153fec5..1a67dbcfe84 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.api.v5;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 
 /**
  * Builder for configuring and creating a {@link CheckpointConsumer}.
@@ -111,9 +111,9 @@ public interface CheckpointConsumerBuilder<T> {
      *
      * @param policy the encryption policy to use
      * @return this builder instance for chaining
-     * @see EncryptionPolicy#forConsumer
+     * @see ConsumerEncryptionPolicy#builder()
      */
-    CheckpointConsumerBuilder<T> encryptionPolicy(EncryptionPolicy policy);
+    CheckpointConsumerBuilder<T> encryptionPolicy(ConsumerEncryptionPolicy 
policy);
 
     // --- Metadata ---
 
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
index 65800756c19..207686e88d1 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/ProducerBuilder.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
 import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
 import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
 
 /**
  * Builder for configuring and creating a {@link Producer}.
@@ -133,11 +133,11 @@ public interface ProducerBuilder<T> {
     /**
      * Configure end-to-end message encryption.
      *
-     * @param policy the encryption policy for producing encrypted messages
+     * @param policy the producer-side encryption policy
      * @return this builder instance for chaining
-     * @see 
EncryptionPolicy#forProducer(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader,
 String...)
+     * @see ProducerEncryptionPolicy#builder()
      */
-    ProducerBuilder<T> encryptionPolicy(EncryptionPolicy policy);
+    ProducerBuilder<T> encryptionPolicy(ProducerEncryptionPolicy policy);
 
     /**
      * Set the initial sequence ID for producer message deduplication. 
Subsequent messages
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index db03f3c5469..f616f79beee 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -22,8 +22,8 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 
@@ -194,9 +194,9 @@ public interface QueueConsumerBuilder<T> {
      *
      * @param policy the encryption policy to use
      * @return this builder instance for chaining
-     * @see EncryptionPolicy#forConsumer
+     * @see ConsumerEncryptionPolicy#builder()
      */
-    QueueConsumerBuilder<T> encryptionPolicy(EncryptionPolicy policy);
+    QueueConsumerBuilder<T> encryptionPolicy(ConsumerEncryptionPolicy policy);
 
 
     // --- Misc ---
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index 0963f792035..ab813842b4f 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -22,7 +22,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 
 /**
@@ -164,9 +164,9 @@ public interface StreamConsumerBuilder<T> {
      *
      * @param policy the encryption policy to use
      * @return this builder instance for chaining
-     * @see EncryptionPolicy#forConsumer
+     * @see ConsumerEncryptionPolicy#builder()
      */
-    StreamConsumerBuilder<T> encryptionPolicy(EncryptionPolicy policy);
+    StreamConsumerBuilder<T> encryptionPolicy(ConsumerEncryptionPolicy policy);
 
     // --- Metadata ---
 
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
similarity index 53%
copy from 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
copy to 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
index bec3eeb6054..f4b1b5d3fe1 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ConsumerCryptoFailureAction.java
@@ -18,28 +18,29 @@
  */
 package org.apache.pulsar.client.api.v5.auth;
 
-import java.util.Map;
-
 /**
- * Interface for loading encryption and decryption keys for end-to-end message 
encryption.
+ * Action a consumer takes when message decryption fails (e.g. the
+ * {@link PrivateKeyProvider} cannot be reached, returns no key, or the
+ * ciphertext is malformed).
  */
-public interface CryptoKeyReader {
+public enum ConsumerCryptoFailureAction {
+
+    /**
+     * Fail the {@code receive} call. The application sees the decryption error
+     * and the message stays unacknowledged so it will be redelivered.
+     */
+    FAIL,
 
     /**
-     * Get the public key for encrypting messages.
-     *
-     * @param keyName  the name of the key
-     * @param metadata additional metadata associated with the key
-     * @return the encryption key info containing the public key data
+     * Silently acknowledge and skip the message. Useful when the consumer
+     * legitimately cannot read some encrypted streams (e.g. a side channel)
+     * but should keep moving forward through the rest.
      */
-    EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata);
+    DISCARD,
 
     /**
-     * Get the private key for decrypting messages.
-     *
-     * @param keyName  the name of the key
-     * @param metadata additional metadata associated with the key
-     * @return the encryption key info containing the private key data
+     * Deliver the message to the application as-is, with the still-encrypted
+     * payload. The application can then handle decryption out-of-band.
      */
-    EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata);
+    CONSUME
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
similarity index 60%
rename from 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java
rename to 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
index cc429bbc8c9..6360a1590d3 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKeyInfo.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyProvider.java
@@ -18,20 +18,15 @@
  */
 package org.apache.pulsar.client.api.v5.auth;
 
-import java.util.Map;
-import java.util.Objects;
-
 /**
- * Holds an encryption key and associated metadata.
+ * Convenience interface for implementations that serve <em>both</em> public 
keys (for
+ * producer-side encryption) and private keys (for consumer-side decryption) — 
for
+ * example, a single PEM-file-backed key store used by both sides of an 
in-process
+ * round trip.
  *
- * @param key      the raw key bytes
- * @param metadata key-value metadata associated with the key
+ * <p>Producer-only or consumer-only implementations should implement
+ * {@link PublicKeyProvider} or {@link PrivateKeyProvider} directly instead — 
that
+ * makes the role explicit and avoids stub methods that throw.
  */
-public record EncryptionKeyInfo(byte[] key, Map<String, String> metadata) {
-    public EncryptionKeyInfo {
-        Objects.requireNonNull(key, "key must not be null");
-        if (metadata == null) {
-            metadata = Map.of();
-        }
-    }
+public interface CryptoKeyProvider extends PublicKeyProvider, 
PrivateKeyProvider {
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java
new file mode 100644
index 00000000000..4d44235d06d
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/EncryptionKey.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A single encryption / decryption key returned by a {@link 
PublicKeyProvider} or
+ * {@link PrivateKeyProvider}.
+ *
+ * <p>The producer-side flow returns just the key bytes. The consumer-side 
flow may
+ * include {@link #metadata()} that the producer attached when the message was
+ * encrypted (e.g. a key version) — the {@link PrivateKeyProvider} can use it 
to
+ * pick the right private key when keys have been rotated.
+ *
+ * <p>Use {@link #of(byte[])} when there's no metadata, or {@link #of(byte[], 
Map)}
+ * to attach producer-side hints alongside the bytes.
+ */
+public final class EncryptionKey {
+
+    private final byte[] key;
+    private final Map<String, String> metadata;
+
+    private EncryptionKey(byte[] key, Map<String, String> metadata) {
+        this.key = Objects.requireNonNull(key, "key must not be null");
+        this.metadata = metadata == null ? Map.of() : Map.copyOf(metadata);
+    }
+
+    /**
+     * @return the raw key bytes
+     */
+    public byte[] key() {
+        return key;
+    }
+
+    /**
+     * @return key-value metadata associated with the key (never {@code null};
+     *         empty when the producer did not attach any)
+     */
+    public Map<String, String> metadata() {
+        return metadata;
+    }
+
+    /**
+     * Create an {@link EncryptionKey} with no metadata.
+     *
+     * @param key the raw key bytes
+     * @return a new {@link EncryptionKey}
+     */
+    public static EncryptionKey of(byte[] key) {
+        return new EncryptionKey(key, Map.of());
+    }
+
+    /**
+     * Create an {@link EncryptionKey} with associated metadata.
+     *
+     * @param key      the raw key bytes
+     * @param metadata key-value metadata to attach to the key
+     * @return a new {@link EncryptionKey}
+     */
+    public static EncryptionKey of(byte[] key, Map<String, String> metadata) {
+        return new EncryptionKey(key, metadata);
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java
new file mode 100644
index 00000000000..7670d5dfecc
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProvider.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Batteries-included key provider that loads PEM-encoded keys from local 
files.
+ *
+ * <p>The same instance can serve as both {@link PublicKeyProvider} and
+ * {@link PrivateKeyProvider} — register public keys for the producer side and
+ * private keys for the consumer side. In typical setups each side instantiates
+ * its own provider, configured only with the keys it actually needs.
+ *
+ * <p>For more complex sources (KMS, Vault, ...) implement {@link 
PublicKeyProvider}
+ * or {@link PrivateKeyProvider} directly.
+ *
+ * <pre>{@code
+ * var keys = PemFileKeyProvider.builder()
+ *         .publicKey("orders-v1", Path.of("/etc/keys/orders-pub.pem"))
+ *         .privateKey("orders-v1", Path.of("/etc/keys/orders-priv.pem"))
+ *         .build();
+ *
+ * client.newProducer(Schema.string())
+ *       .topic("orders")
+ *       .encryptionPolicy(ProducerEncryptionPolicy.builder()
+ *               .publicKeyProvider(keys)
+ *               .keyName("orders-v1")
+ *               .build())
+ *       .create();
+ * }</pre>
+ */
+public final class PemFileKeyProvider implements CryptoKeyProvider {
+
+    private final Map<String, Path> publicKeys;
+    private final Map<String, Path> privateKeys;
+
+    private PemFileKeyProvider(Map<String, Path> publicKeys, Map<String, Path> 
privateKeys) {
+        this.publicKeys = Map.copyOf(publicKeys);
+        this.privateKeys = Map.copyOf(privateKeys);
+    }
+
+    @Override
+    public CompletableFuture<EncryptionKey> getPublicKey(String keyName) {
+        return loadKey(keyName, publicKeys, "public");
+    }
+
+    @Override
+    public CompletableFuture<EncryptionKey> getPrivateKey(String keyName, 
Map<String, String> metadata) {
+        return loadKey(keyName, privateKeys, "private");
+    }
+
+    private static CompletableFuture<EncryptionKey> loadKey(String keyName,
+                                                            Map<String, Path> 
keys,
+                                                            String role) {
+        Path path = keys.get(keyName);
+        if (path == null) {
+            return CompletableFuture.failedFuture(new IllegalArgumentException(
+                    "no " + role + " key registered for name: " + keyName));
+        }
+        try {
+            byte[] bytes = Files.readAllBytes(path);
+            return CompletableFuture.completedFuture(EncryptionKey.of(bytes));
+        } catch (IOException e) {
+            return CompletableFuture.failedFuture(new IOException(
+                    "failed to read " + role + " key '" + keyName + "' from " 
+ path, e));
+        }
+    }
+
+    /**
+     * @return a new builder for constructing a {@link PemFileKeyProvider}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link PemFileKeyProvider}.
+     */
+    public static final class Builder {
+        private final Map<String, Path> publicKeys = new HashMap<>();
+        private final Map<String, Path> privateKeys = new HashMap<>();
+
+        private Builder() {
+        }
+
+        /**
+         * Register a public key file under the given name. Producer-side use.
+         *
+         * @param keyName the key identifier the producer will reference
+         * @param path    path to the PEM-encoded public key file
+         * @return this builder
+         */
+        public Builder publicKey(String keyName, Path path) {
+            publicKeys.put(keyName, path);
+            return this;
+        }
+
+        /**
+         * Register a private key file under the given name. Consumer-side use.
+         *
+         * @param keyName the key identifier the producer used to encrypt
+         * @param path    path to the PEM-encoded private key file
+         * @return this builder
+         */
+        public Builder privateKey(String keyName, Path path) {
+            privateKeys.put(keyName, path);
+            return this;
+        }
+
+        /**
+         * @return a new {@link PemFileKeyProvider} instance
+         */
+        public PemFileKeyProvider build() {
+            return new PemFileKeyProvider(publicKeys, privateKeys);
+        }
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java
new file mode 100644
index 00000000000..acdfe60d649
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PrivateKeyProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Consumer-side SPI: load a private key by name for end-to-end message 
decryption.
+ *
+ * <p>The provider is consulted on every encrypted message the consumer 
receives.
+ * The signature is asynchronous so an implementation backed by a remote KMS
+ * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's 
IO
+ * thread. For local key stores, return a completed future.
+ *
+ * <p>For a simple file-based provider, see {@link PemFileKeyProvider}.
+ */
+public interface PrivateKeyProvider {
+
+    /**
+     * Look up the private key for the given name.
+     *
+     * <p>{@code metadata} carries any hints the producer attached when the 
message
+     * was encrypted — typically a key version or rotation marker. 
Implementations
+     * that don't rotate keys can ignore it.
+     *
+     * @param keyName  the key identifier the producer used to encrypt
+     * @param metadata producer-supplied hints about the key (never {@code 
null};
+     *                 empty when the producer didn't attach any)
+     * @return a future completing with the private key, or completing 
exceptionally
+     *         if the key cannot be found or loaded
+     */
+    CompletableFuture<EncryptionKey> getPrivateKey(String keyName, Map<String, 
String> metadata);
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
similarity index 66%
rename from 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java
rename to 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
index 3b48c2a46f7..0ef6e71c2a9 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoFailureAction.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/ProducerCryptoFailureAction.java
@@ -19,23 +19,20 @@
 package org.apache.pulsar.client.api.v5.auth;
 
 /**
- * Action to take when a message encryption or decryption operation fails.
+ * Action a producer takes when message encryption fails (e.g. the
+ * {@link PublicKeyProvider} cannot be reached or returns no key).
  */
-public enum CryptoFailureAction {
+public enum ProducerCryptoFailureAction {
 
     /**
-     * Fail the operation and return an error to the caller.
+     * Fail the {@code send} call. The send future completes exceptionally and
+     * the application sees the error.
      */
     FAIL,
 
     /**
-     * Silently discard the message (consumer side only).
+     * Send the message unencrypted instead of failing. Useful when encryption
+     * is opportunistic — for example, during a key-rollout migration.
      */
-    DISCARD,
-
-    /**
-     * Deliver the message to the consumer without decrypting (consumer side 
only).
-     * The message will contain encrypted payload.
-     */
-    CONSUME
+    SEND_UNENCRYPTED
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java
similarity index 50%
rename from 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
rename to 
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java
index bec3eeb6054..af2b3172c8b 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/CryptoKeyReader.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/PublicKeyProvider.java
@@ -18,28 +18,26 @@
  */
 package org.apache.pulsar.client.api.v5.auth;
 
-import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Interface for loading encryption and decryption keys for end-to-end message 
encryption.
+ * Producer-side SPI: load a public key by name for end-to-end message 
encryption.
+ *
+ * <p>The provider is consulted at producer creation time and on every key 
rotation.
+ * The signature is asynchronous so an implementation backed by a remote KMS
+ * (Vault, AWS KMS, GCP KMS, ...) can fetch keys without blocking the client's 
IO
+ * thread. For local key stores, return a completed future.
+ *
+ * <p>For a simple file-based provider, see {@link PemFileKeyProvider}.
  */
-public interface CryptoKeyReader {
-
-    /**
-     * Get the public key for encrypting messages.
-     *
-     * @param keyName  the name of the key
-     * @param metadata additional metadata associated with the key
-     * @return the encryption key info containing the public key data
-     */
-    EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata);
+public interface PublicKeyProvider {
 
     /**
-     * Get the private key for decrypting messages.
+     * Look up the public key with the given name.
      *
-     * @param keyName  the name of the key
-     * @param metadata additional metadata associated with the key
-     * @return the encryption key info containing the private key data
+     * @param keyName the key identifier as configured on the producer
+     * @return a future completing with the public key, or completing 
exceptionally
+     *         if the key cannot be found or loaded
      */
-    EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata);
+    CompletableFuture<EncryptionKey> getPublicKey(String keyName);
 }
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
index 7d8141ad558..f7f22f13a37 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/auth/package-info.java
@@ -23,6 +23,8 @@
  * <p>Provides pluggable authentication via {@link 
org.apache.pulsar.client.api.v5.auth.Authentication}
  * and convenience factories in {@link 
org.apache.pulsar.client.api.v5.auth.AuthenticationFactory},
  * as well as end-to-end encryption support via
- * {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader}.
+ * {@link org.apache.pulsar.client.api.v5.auth.PublicKeyProvider} (producer 
side) and
+ * {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} (consumer 
side).
+ * For local PEM-file-backed setups, use {@link 
org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider}.
  */
 package org.apache.pulsar.client.api.v5.auth;
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java
new file mode 100644
index 00000000000..08dc86b57ac
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ConsumerEncryptionPolicy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+
+/**
+ * Consumer-side end-to-end decryption configuration.
+ *
+ * <p>Construct via {@link #builder()}. The {@link PrivateKeyProvider} is 
required
+ * when {@link #failureAction()} is {@link ConsumerCryptoFailureAction#FAIL} 
(the
+ * default — strict mode); for {@link ConsumerCryptoFailureAction#DISCARD} or
+ * {@link ConsumerCryptoFailureAction#CONSUME} the provider may be omitted, in
+ * which case the consumer just relies on the failure action to decide what to 
do
+ * with encrypted messages it can't decrypt.
+ */
+@EqualsAndHashCode
+@ToString
+public final class ConsumerEncryptionPolicy {
+
+    private final PrivateKeyProvider privateKeyProvider;
+    private final ConsumerCryptoFailureAction failureAction;
+
+    private ConsumerEncryptionPolicy(PrivateKeyProvider privateKeyProvider,
+                                     ConsumerCryptoFailureAction 
failureAction) {
+        Objects.requireNonNull(failureAction, "failureAction must not be 
null");
+        if (failureAction == ConsumerCryptoFailureAction.FAIL && 
privateKeyProvider == null) {
+            throw new IllegalArgumentException(
+                    "privateKeyProvider must be set when failureAction is 
FAIL");
+        }
+        this.privateKeyProvider = privateKeyProvider;
+        this.failureAction = failureAction;
+    }
+
+    /**
+     * @return the provider used to load private keys for decryption, or 
{@code null}
+     *         when the consumer doesn't decrypt and falls back to the failure 
action
+     *         (DISCARD or CONSUME)
+     */
+    public PrivateKeyProvider privateKeyProvider() {
+        return privateKeyProvider;
+    }
+
+    /**
+     * @return the action the consumer takes when decryption fails
+     */
+    public ConsumerCryptoFailureAction failureAction() {
+        return failureAction;
+    }
+
+    /**
+     * @return a new builder for constructing a {@link 
ConsumerEncryptionPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link ConsumerEncryptionPolicy}.
+     */
+    public static final class Builder {
+        private PrivateKeyProvider privateKeyProvider;
+        private ConsumerCryptoFailureAction failureAction = 
ConsumerCryptoFailureAction.FAIL;
+
+        private Builder() {
+        }
+
+        /**
+         * Provider used to load private keys. Required.
+         *
+         * @param privateKeyProvider the private-key provider
+         * @return this builder
+         */
+        public Builder privateKeyProvider(PrivateKeyProvider 
privateKeyProvider) {
+            this.privateKeyProvider = privateKeyProvider;
+            return this;
+        }
+
+        /**
+         * Action to take when decryption fails. Default: {@link 
ConsumerCryptoFailureAction#FAIL}.
+         *
+         * @param failureAction the failure action
+         * @return this builder
+         */
+        public Builder failureAction(ConsumerCryptoFailureAction 
failureAction) {
+            this.failureAction = failureAction;
+            return this;
+        }
+
+        /**
+         * @return a new {@link ConsumerEncryptionPolicy} instance
+         */
+        public ConsumerEncryptionPolicy build() {
+            return new ConsumerEncryptionPolicy(privateKeyProvider, 
failureAction);
+        }
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
deleted file mode 100644
index c196a8f20fb..00000000000
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicy.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api.v5.config;
-
-import java.util.List;
-import java.util.Objects;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import org.apache.pulsar.client.api.v5.auth.CryptoFailureAction;
-import org.apache.pulsar.client.api.v5.auth.CryptoKeyReader;
-
-/**
- * End-to-end encryption configuration for producers and consumers.
- *
- * <p>For producers, supply a {@link CryptoKeyReader} and one or more 
encryption key names —
- * use {@link #forProducer(CryptoKeyReader, String...)} as the typical entry 
point.
- * For consumers/readers, supply a {@link CryptoKeyReader} and a {@link 
CryptoFailureAction} —
- * use {@link #forConsumer(CryptoKeyReader, CryptoFailureAction)}.
- *
- * <p>{@link #builder()} is available for callers that need to tune both sides 
explicitly.
- */
-@EqualsAndHashCode
-@ToString
-public final class EncryptionPolicy {
-
-    private final CryptoKeyReader keyReader;
-    private final List<String> keyNames;
-    private final CryptoFailureAction failureAction;
-
-    private EncryptionPolicy(CryptoKeyReader keyReader, List<String> keyNames,
-                             CryptoFailureAction failureAction) {
-        Objects.requireNonNull(keyReader, "keyReader must not be null");
-        if (keyNames == null) {
-            keyNames = List.of();
-        }
-        if (failureAction == null) {
-            failureAction = CryptoFailureAction.FAIL;
-        }
-        this.keyReader = keyReader;
-        this.keyNames = List.copyOf(keyNames);
-        this.failureAction = failureAction;
-    }
-
-    /**
-     * @return the crypto key reader for loading encryption/decryption keys
-     */
-    public CryptoKeyReader keyReader() {
-        return keyReader;
-    }
-
-    /**
-     * @return the producer-side encryption key names (empty list for 
consumer/reader)
-     */
-    public List<String> keyNames() {
-        return keyNames;
-    }
-
-    /**
-     * @return the action to take when encryption or decryption fails
-     */
-    public CryptoFailureAction failureAction() {
-        return failureAction;
-    }
-
-    /**
-     * Create an encryption policy for producers.
-     *
-     * @param keyReader the crypto key reader for loading encryption keys
-     * @param keyNames  one or more encryption key names to use
-     * @return an {@link EncryptionPolicy} configured for producer-side 
encryption
-     */
-    public static EncryptionPolicy forProducer(CryptoKeyReader keyReader, 
String... keyNames) {
-        return new EncryptionPolicy(keyReader, List.of(keyNames), 
CryptoFailureAction.FAIL);
-    }
-
-    /**
-     * Create an encryption policy for consumers/readers.
-     *
-     * @param keyReader     the crypto key reader for loading decryption keys
-     * @param failureAction the action to take when decryption fails
-     * @return an {@link EncryptionPolicy} configured for consumer-side 
decryption
-     */
-    public static EncryptionPolicy forConsumer(CryptoKeyReader keyReader, 
CryptoFailureAction failureAction) {
-        return new EncryptionPolicy(keyReader, List.of(), failureAction);
-    }
-
-    /**
-     * @return a new builder for constructing an {@link EncryptionPolicy}
-     */
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    /**
-     * Builder for {@link EncryptionPolicy}.
-     */
-    public static final class Builder {
-        private CryptoKeyReader keyReader;
-        private List<String> keyNames = List.of();
-        private CryptoFailureAction failureAction = CryptoFailureAction.FAIL;
-
-        private Builder() {
-        }
-
-        /**
-         * Crypto key reader used to load encryption/decryption keys. Required.
-         *
-         * @param keyReader the key reader
-         * @return this builder
-         */
-        public Builder keyReader(CryptoKeyReader keyReader) {
-            this.keyReader = keyReader;
-            return this;
-        }
-
-        /**
-         * Producer-side encryption key names. Leave empty (default) for 
consumer-side use.
-         *
-         * @param keyNames the key names
-         * @return this builder
-         */
-        public Builder keyNames(String... keyNames) {
-            this.keyNames = List.of(keyNames);
-            return this;
-        }
-
-        /**
-         * Action to take when encryption (producer) or decryption (consumer) 
fails.
-         * Default is {@link CryptoFailureAction#FAIL}.
-         *
-         * @param failureAction the failure action
-         * @return this builder
-         */
-        public Builder failureAction(CryptoFailureAction failureAction) {
-            this.failureAction = failureAction;
-            return this;
-        }
-
-        /**
-         * @return a new {@link EncryptionPolicy} instance
-         */
-        public EncryptionPolicy build() {
-            return new EncryptionPolicy(keyReader, keyNames, failureAction);
-        }
-    }
-}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java
new file mode 100644
index 00000000000..0e793b3737e
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/ProducerEncryptionPolicy.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import java.util.List;
+import java.util.Objects;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
+
+/**
+ * Producer-side end-to-end encryption configuration.
+ *
+ * <p>Construct via {@link #builder()}. Required: a {@link PublicKeyProvider} 
and at
+ * least one key name.
+ */
+@EqualsAndHashCode
+@ToString
+public final class ProducerEncryptionPolicy {
+
+    private final PublicKeyProvider publicKeyProvider;
+    private final List<String> keyNames;
+    private final ProducerCryptoFailureAction failureAction;
+
+    private ProducerEncryptionPolicy(PublicKeyProvider publicKeyProvider,
+                                     List<String> keyNames,
+                                     ProducerCryptoFailureAction 
failureAction) {
+        Objects.requireNonNull(publicKeyProvider, "publicKeyProvider must not 
be null");
+        Objects.requireNonNull(keyNames, "keyNames must not be null");
+        if (keyNames.isEmpty()) {
+            throw new IllegalArgumentException("at least one key name must be 
configured");
+        }
+        Objects.requireNonNull(failureAction, "failureAction must not be 
null");
+        this.publicKeyProvider = publicKeyProvider;
+        this.keyNames = List.copyOf(keyNames);
+        this.failureAction = failureAction;
+    }
+
+    /**
+     * @return the provider used to load public keys for encryption
+     */
+    public PublicKeyProvider publicKeyProvider() {
+        return publicKeyProvider;
+    }
+
+    /**
+     * @return the configured key names; the producer encrypts each message's 
data
+     *         key with every public key listed here
+     */
+    public List<String> keyNames() {
+        return keyNames;
+    }
+
+    /**
+     * @return the action the producer takes when encryption fails
+     */
+    public ProducerCryptoFailureAction failureAction() {
+        return failureAction;
+    }
+
+    /**
+     * @return a new builder for constructing a {@link 
ProducerEncryptionPolicy}
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link ProducerEncryptionPolicy}.
+     */
+    public static final class Builder {
+        private PublicKeyProvider publicKeyProvider;
+        private List<String> keyNames = List.of();
+        private ProducerCryptoFailureAction failureAction = 
ProducerCryptoFailureAction.FAIL;
+
+        private Builder() {
+        }
+
+        /**
+         * Provider used to load public keys. Required.
+         *
+         * @param publicKeyProvider the public-key provider
+         * @return this builder
+         */
+        public Builder publicKeyProvider(PublicKeyProvider publicKeyProvider) {
+            this.publicKeyProvider = publicKeyProvider;
+            return this;
+        }
+
+        /**
+         * Single key name shortcut — equivalent to {@code 
keyNames(List.of(name))}.
+         *
+         * @param keyName the key name
+         * @return this builder
+         */
+        public Builder keyName(String keyName) {
+            this.keyNames = List.of(keyName);
+            return this;
+        }
+
+        /**
+         * Multiple key names. The producer encrypts each message's data key 
with
+         * every public key listed here, so any consumer with one of the 
matching
+         * private keys can decrypt.
+         *
+         * @param keyNames one or more key names
+         * @return this builder
+         */
+        public Builder keyNames(String... keyNames) {
+            this.keyNames = List.of(keyNames);
+            return this;
+        }
+
+        /**
+         * Action to take when encryption fails. Default: {@link 
ProducerCryptoFailureAction#FAIL}.
+         *
+         * @param failureAction the failure action
+         * @return this builder
+         */
+        public Builder failureAction(ProducerCryptoFailureAction 
failureAction) {
+            this.failureAction = failureAction;
+            return this;
+        }
+
+        /**
+         * @return a new {@link ProducerEncryptionPolicy} instance
+         */
+        public ProducerEncryptionPolicy build() {
+            return new ProducerEncryptionPolicy(publicKeyProvider, keyNames, 
failureAction);
+        }
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index f05cfbd4a21..b3439d31c0a 100644
--- 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api.v5;
 
+import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
@@ -26,14 +27,18 @@ import org.apache.pulsar.client.api.v5.async.AsyncProducer;
 import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
 import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
 import org.apache.pulsar.client.api.v5.auth.AuthenticationFactory;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
 import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
 import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
 import org.apache.pulsar.client.api.v5.config.CompressionType;
 import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
 import org.apache.pulsar.client.api.v5.config.MemorySize;
 import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.config.TlsPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -300,6 +305,42 @@ public class Examples {
         }
     }
 
+    /**
+     * End-to-end encryption — producer encrypts with a public key, consumer 
decrypts
+     * with the matching private key. The {@link PemFileKeyProvider} is the
+     * batteries-included reader for local PEM files; for remote key stores
+     * (KMS, Vault, ...) implement {@link 
org.apache.pulsar.client.api.v5.auth.PublicKeyProvider}
+     * or {@link org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider} 
directly.
+     */
+    void encryptedProducerAndConsumer(PulsarClient client) throws Exception {
+        try (var producer = client.newProducer(Schema.string())
+                .topic("orders")
+                .encryptionPolicy(ProducerEncryptionPolicy.builder()
+                        .publicKeyProvider(PemFileKeyProvider.builder()
+                                .publicKey("orders-v1", 
Path.of("/etc/keys/orders-pub.pem"))
+                                .build())
+                        .keyName("orders-v1")
+                        .build())
+                .create()) {
+            producer.newMessage().value("classified payload").send();
+        }
+
+        try (var consumer = client.newQueueConsumer(Schema.string())
+                .topic("orders")
+                .subscriptionName("trusted")
+                .encryptionPolicy(ConsumerEncryptionPolicy.builder()
+                        .privateKeyProvider(PemFileKeyProvider.builder()
+                                .privateKey("orders-v1", 
Path.of("/etc/keys/orders-priv.pem"))
+                                .build())
+                        .failureAction(ConsumerCryptoFailureAction.FAIL)
+                        .build())
+                .subscribe()) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            // ... use msg.value()
+            consumer.acknowledge(msg.id());
+        }
+    }
+
     /** Async queue consumer — high-throughput parallel processing. */
     void asyncQueueConsumer(PulsarClient client) throws Exception {
         try (var consumer = client.newQueueConsumer(Schema.json(Order.class))
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java
new file mode 100644
index 00000000000..74fbc5128fa
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/auth/PemFileKeyProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit coverage for {@link PemFileKeyProvider}: registering a key file → 
reading it
+ * back as bytes; missing key → failed future; missing file on disk → failed 
future.
+ */
+public class PemFileKeyProviderTest {
+
+    private Path tempDir;
+
+    @BeforeMethod
+    public void setUp() throws IOException {
+        tempDir = Files.createTempDirectory("pem-file-key-provider-test");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        if (tempDir != null) {
+            try (var stream = Files.walk(tempDir)) {
+                stream.sorted(java.util.Comparator.reverseOrder())
+                        .forEach(p -> {
+                            try {
+                                Files.deleteIfExists(p);
+                            } catch (IOException ignored) {
+                                // best effort
+                            }
+                        });
+            }
+        }
+    }
+
+    @Test
+    public void testReadRegisteredPublicKey() throws Exception {
+        Path keyFile = Files.writeString(tempDir.resolve("pub.pem"), 
"PUBLIC-KEY-BYTES");
+        var provider = PemFileKeyProvider.builder()
+                .publicKey("orders-v1", keyFile)
+                .build();
+
+        EncryptionKey key = provider.getPublicKey("orders-v1").get();
+        assertEquals(new String(key.key()), "PUBLIC-KEY-BYTES");
+        assertTrue(key.metadata().isEmpty());
+    }
+
+    @Test
+    public void testReadRegisteredPrivateKey() throws Exception {
+        Path keyFile = Files.writeString(tempDir.resolve("priv.pem"), 
"PRIVATE-KEY-BYTES");
+        var provider = PemFileKeyProvider.builder()
+                .privateKey("orders-v1", keyFile)
+                .build();
+
+        EncryptionKey key = provider.getPrivateKey("orders-v1", 
Map.of()).get();
+        assertEquals(new String(key.key()), "PRIVATE-KEY-BYTES");
+    }
+
+    @Test
+    public void testUnknownKeyNameFailsFuture() {
+        var provider = PemFileKeyProvider.builder().build();
+
+        var ex = expectThrows(CompletionException.class,
+                () -> provider.getPublicKey("missing").join());
+        assertTrue(ex.getCause() instanceof IllegalArgumentException,
+                "expected IllegalArgumentException, got: " + ex.getCause());
+    }
+
+    @Test
+    public void testMissingFileOnDiskFailsFuture() {
+        Path missing = tempDir.resolve("does-not-exist.pem");
+        var provider = PemFileKeyProvider.builder()
+                .publicKey("orders-v1", missing)
+                .build();
+
+        var ex = expectThrows(CompletionException.class,
+                () -> provider.getPublicKey("orders-v1").join());
+        assertTrue(ex.getCause() instanceof IOException,
+                "expected IOException, got: " + ex.getCause());
+    }
+
+    @Test
+    public void testProviderUsableAsBothSides() throws Exception {
+        Path pub = Files.writeString(tempDir.resolve("pub.pem"), "PUB");
+        Path priv = Files.writeString(tempDir.resolve("priv.pem"), "PRIV");
+        var provider = PemFileKeyProvider.builder()
+                .publicKey("k", pub)
+                .privateKey("k", priv)
+                .build();
+
+        assertEquals(new String(provider.getPublicKey("k").get().key()), 
"PUB");
+        assertEquals(new String(provider.getPrivateKey("k", 
Map.of()).get().key()), "PRIV");
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java
 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java
new file mode 100644
index 00000000000..079b5b2962e
--- /dev/null
+++ 
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/config/EncryptionPolicyTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5.config;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.expectThrows;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.auth.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Builder validation for {@link ProducerEncryptionPolicy} and
+ * {@link ConsumerEncryptionPolicy}.
+ */
+public class EncryptionPolicyTest {
+
+    private static final PublicKeyProvider STUB_PUB =
+            keyName -> CompletableFuture.completedFuture(EncryptionKey.of(new 
byte[0]));
+    private static final PrivateKeyProvider STUB_PRIV =
+            (keyName, metadata) -> 
CompletableFuture.completedFuture(EncryptionKey.of(new byte[0]));
+
+    // --- Producer side ---
+
+    @Test
+    public void testProducerPolicyMissingProviderRejected() {
+        expectThrows(NullPointerException.class, () ->
+                ProducerEncryptionPolicy.builder()
+                        .keyName("k1")
+                        .build());
+    }
+
+    @Test
+    public void testProducerPolicyMissingKeyNameRejected() {
+        expectThrows(IllegalArgumentException.class, () ->
+                ProducerEncryptionPolicy.builder()
+                        .publicKeyProvider(STUB_PUB)
+                        .build());
+    }
+
+    @Test
+    public void testProducerPolicyDefaultsFailureActionToFail() {
+        ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder()
+                .publicKeyProvider(STUB_PUB)
+                .keyName("k1")
+                .build();
+        assertSame(p.failureAction(), ProducerCryptoFailureAction.FAIL);
+        assertSame(p.publicKeyProvider(), STUB_PUB);
+        assertEquals(p.keyNames(), List.of("k1"));
+    }
+
+    @Test
+    public void testProducerPolicyMultipleKeyNames() {
+        ProducerEncryptionPolicy p = ProducerEncryptionPolicy.builder()
+                .publicKeyProvider(STUB_PUB)
+                .keyNames("k1", "k2", "k3")
+                .failureAction(ProducerCryptoFailureAction.SEND_UNENCRYPTED)
+                .build();
+        assertEquals(p.keyNames(), List.of("k1", "k2", "k3"));
+        assertSame(p.failureAction(), 
ProducerCryptoFailureAction.SEND_UNENCRYPTED);
+    }
+
+    // --- Consumer side ---
+
+    @Test
+    public void testConsumerPolicyFailModeRequiresProvider() {
+        // FAIL is the default failure action and requires a provider.
+        expectThrows(IllegalArgumentException.class, () ->
+                ConsumerEncryptionPolicy.builder().build());
+    }
+
+    @Test
+    public void testConsumerPolicyDiscardModeWithoutProviderAllowed() {
+        // DISCARD / CONSUME accept a null provider (consumer doesn't decrypt).
+        ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+                .failureAction(ConsumerCryptoFailureAction.DISCARD)
+                .build();
+        assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD);
+        org.testng.Assert.assertNull(p.privateKeyProvider());
+    }
+
+    @Test
+    public void testConsumerPolicyDefaultsFailureActionToFail() {
+        ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+                .privateKeyProvider(STUB_PRIV)
+                .build();
+        assertSame(p.failureAction(), ConsumerCryptoFailureAction.FAIL);
+        assertSame(p.privateKeyProvider(), STUB_PRIV);
+    }
+
+    @Test
+    public void testConsumerPolicyExplicitFailureAction() {
+        ConsumerEncryptionPolicy p = ConsumerEncryptionPolicy.builder()
+                .privateKeyProvider(STUB_PRIV)
+                .failureAction(ConsumerCryptoFailureAction.DISCARD)
+                .build();
+        assertSame(p.failureAction(), ConsumerCryptoFailureAction.DISCARD);
+    }
+
+    // --- EncryptionKey factories ---
+
+    @Test
+    public void testEncryptionKeyFactories() {
+        byte[] bytes = "key-material".getBytes();
+        EncryptionKey k1 = EncryptionKey.of(bytes);
+        assertSame(k1.key(), bytes);
+        assertEquals(k1.metadata(), Map.of());
+
+        EncryptionKey k2 = EncryptionKey.of(bytes, Map.of("version", "v1"));
+        assertEquals(k2.metadata(), Map.of("version", "v1"));
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index c65e16d0fc5..119702e9497 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.v5.Checkpoint;
 import org.apache.pulsar.client.api.v5.CheckpointConsumer;
 import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -40,7 +40,7 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
     private Checkpoint startPosition = CheckpointV5.LATEST;
     private String consumerName;
     private String consumerGroup;
-    private EncryptionPolicy encryptionPolicy;
+    private ConsumerEncryptionPolicy encryptionPolicy;
 
     CheckpointConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
         this.client = client;
@@ -116,7 +116,7 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
     }
 
     @Override
-    public CheckpointConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy 
policy) {
+    public CheckpointConsumerBuilderV5<T> 
encryptionPolicy(ConsumerEncryptionPolicy policy) {
         this.encryptionPolicy = policy;
         return this;
     }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
index d9d2f7e8258..e5161d848f0 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CryptoKeyReaderAdapter.java
@@ -21,32 +21,63 @@ package org.apache.pulsar.client.impl.v5;
 import java.util.Map;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.auth.PublicKeyProvider;
 
 /**
- * Adapts a V5 {@link org.apache.pulsar.client.api.v5.auth.CryptoKeyReader} to 
the V4
- * {@link CryptoKeyReader} interface used by the underlying client 
implementation.
+ * Bridges the V5 split-by-role key SPIs ({@link PublicKeyProvider},
+ * {@link PrivateKeyProvider}) to the v4 {@link CryptoKeyReader}, which has 
both
+ * methods on a single interface.
+ *
+ * <p>v4's {@code CryptoKeyReader} is synchronous, so the adapter blocks on 
the V5
+ * provider's {@link java.util.concurrent.CompletableFuture future} via {@code 
join()}.
+ * For local providers (e.g. {@code PemFileKeyProvider}) the future is already
+ * complete; for remote providers (e.g. KMS-backed) this blocks the v4 thread 
that
+ * called {@code getXxxKey} — same constraint v4 already imposes today. Async
+ * end-to-end requires deeper plumbing into {@code MessageCrypto}; out of 
scope here.
  */
 final class CryptoKeyReaderAdapter implements CryptoKeyReader {
 
-    private final org.apache.pulsar.client.api.v5.auth.CryptoKeyReader 
v5Reader;
+    private final PublicKeyProvider publicKeyProvider;
+    private final PrivateKeyProvider privateKeyProvider;
+
+    private CryptoKeyReaderAdapter(PublicKeyProvider publicKeyProvider,
+                                   PrivateKeyProvider privateKeyProvider) {
+        this.publicKeyProvider = publicKeyProvider;
+        this.privateKeyProvider = privateKeyProvider;
+    }
 
-    private 
CryptoKeyReaderAdapter(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader 
v5Reader) {
-        this.v5Reader = v5Reader;
+    /**
+     * Producer-side adapter: only {@link CryptoKeyReader#getPublicKey} is 
supported.
+     */
+    static CryptoKeyReader forProducer(PublicKeyProvider provider) {
+        return new CryptoKeyReaderAdapter(provider, null);
     }
 
-    static CryptoKeyReader 
wrap(org.apache.pulsar.client.api.v5.auth.CryptoKeyReader v5Reader) {
-        return new CryptoKeyReaderAdapter(v5Reader);
+    /**
+     * Consumer-side adapter: only {@link CryptoKeyReader#getPrivateKey} is 
supported.
+     */
+    static CryptoKeyReader forConsumer(PrivateKeyProvider provider) {
+        return new CryptoKeyReaderAdapter(null, provider);
     }
 
     @Override
     public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata) {
-        var v5Key = v5Reader.getPublicKey(keyName, metadata);
+        if (publicKeyProvider == null) {
+            throw new UnsupportedOperationException(
+                    "getPublicKey called on a consumer-side 
CryptoKeyReaderAdapter");
+        }
+        var v5Key = publicKeyProvider.getPublicKey(keyName).join();
         return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata());
     }
 
     @Override
     public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata) {
-        var v5Key = v5Reader.getPrivateKey(keyName, metadata);
+        if (privateKeyProvider == null) {
+            throw new UnsupportedOperationException(
+                    "getPrivateKey called on a producer-side 
CryptoKeyReaderAdapter");
+        }
+        var v5Key = privateKeyProvider.getPrivateKey(keyName, metadata).join();
         return new EncryptionKeyInfo(v5Key.key(), v5Key.metadata());
     }
 }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index d5c261ede89..922eb95885c 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -29,8 +29,8 @@ import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
 import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
 import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
@@ -156,12 +156,13 @@ final class ProducerBuilderV5<T> implements 
ProducerBuilder<T> {
     }
 
     @Override
-    public ProducerBuilderV5<T> encryptionPolicy(EncryptionPolicy policy) {
-        
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+    public ProducerBuilderV5<T> encryptionPolicy(ProducerEncryptionPolicy 
policy) {
+        
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forProducer(policy.publicKeyProvider()));
         conf.setEncryptionKeys(new HashSet<>(policy.keyNames()));
-        conf.setCryptoFailureAction(
-                
org.apache.pulsar.client.api.ProducerCryptoFailureAction.valueOf(
-                        policy.failureAction().name()));
+        conf.setCryptoFailureAction(switch (policy.failureAction()) {
+            case FAIL -> 
org.apache.pulsar.client.api.ProducerCryptoFailureAction.FAIL;
+            case SEND_UNENCRYPTED -> 
org.apache.pulsar.client.api.ProducerCryptoFailureAction.SEND;
+        });
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index 7f9e32963be..d153a77b0f4 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.QueueConsumer;
 import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
 import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.schema.Schema;
@@ -205,8 +205,10 @@ final class QueueConsumerBuilderV5<T> implements 
QueueConsumerBuilder<T> {
     }
 
     @Override
-    public QueueConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy policy) 
{
-        
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+    public QueueConsumerBuilderV5<T> encryptionPolicy(ConsumerEncryptionPolicy 
policy) {
+        if (policy.privateKeyProvider() != null) {
+            
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider()));
+        }
         conf.setCryptoFailureAction(
                 
org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf(
                         policy.failureAction().name()));
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index 6b9f6b67e07..24cf63dacbc 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.client.api.v5.MessageId;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.StreamConsumer;
 import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
-import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -178,8 +178,10 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
     }
 
     @Override
-    public StreamConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy 
policy) {
-        
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
+    public StreamConsumerBuilderV5<T> 
encryptionPolicy(ConsumerEncryptionPolicy policy) {
+        if (policy.privateKeyProvider() != null) {
+            
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.forConsumer(policy.privateKeyProvider()));
+        }
         conf.setCryptoFailureAction(
                 
org.apache.pulsar.client.api.ConsumerCryptoFailureAction.valueOf(
                         policy.failureAction().name()));

Reply via email to