This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ca94db5f90a3e686bed03af7689726ce12b1b10c
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Mon Feb 1 11:10:21 2021 +0900

    Add default implementation of CryptoKeyReader (#9379)
    
    Currently, users must implement the `CryptoKeyReader` interface for 
end-to-end message encryption in Java. I thought it would be useful to have a 
default implementation, so I added `DefaultCryptoKeyReader`.
    
    How to use:
    ```java
    Producer<byte[]> producer = pulsarClient.newProducer()
            .topic("persistent://public/default/test")
            .addEncryptionKey("my-app-key1")
            .defaultCryptoKeyReader("file:///path/to/public.key")
            .create();
    
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
            .topic("persistent://public/default/test")
            .subscriptionName("sub1")
            .defaultCryptoKeyReader(new HashMap<String, String>() {
                {
                    put("my-app-key1", "file:///path/to/private.key");
                    put("my-app-key2", 
"data:application/x-pem-file;base64,*****");
                }
            })
            .subscribe();
    ```
    
    (cherry picked from commit 85b1c7ef8a37e0bc06052f04ecd5da076804e8cb)
---
 .../client/api/SimpleProducerConsumerTest.java     |  96 +++++++++++++++++
 .../apache/pulsar/client/api/TopicReaderTest.java  |  88 ++++++++++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  24 +++++
 .../apache/pulsar/client/api/ProducerBuilder.java  |  24 +++++
 .../apache/pulsar/client/api/ReaderBuilder.java    |  24 +++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  13 +++
 .../pulsar/client/impl/DefaultCryptoKeyReader.java | 107 +++++++++++++++++++
 .../client/impl/DefaultCryptoKeyReaderBuilder.java |  77 ++++++++++++++
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  13 +++
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  15 +++
 .../DefaultCryptoKeyReaderConfigurationData.java   | 114 +++++++++++++++++++++
 .../client/impl/ConsumerBuilderImplTest.java       |  23 +++++
 .../client/impl/DefaultCryptoKeyReaderTest.java    | 113 ++++++++++++++++++++
 .../client/impl/ProducerBuilderImplTest.java       |  24 +++++
 ...efaultCryptoKeyReaderConfigurationDataTest.java |  73 +++++++++++++
 .../src/test/resources/crypto_ecdsa_private.key    |  29 ++++++
 .../src/test/resources/crypto_ecdsa_public.key     |  15 +++
 .../src/test/resources/crypto_rsa_private.key      |  27 +++++
 .../src/test/resources/crypto_rsa_public.key       |   9 ++
 19 files changed, 908 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 214ec31..4db81d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2617,6 +2617,102 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test(groups = "encryption")
+    public void testDefaultCryptoKeyReader() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/default-crypto-key-reader" + 
System.currentTimeMillis();
+        final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+        final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+        final String ecdsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS
 [...]
+        final String ecdsaPrivateKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQ
 [...]
+        final String rsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-rsa.pem";
+        final String rsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-rsa.pem";
+        final String rsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu
 [...]
+        final String rsaPrivateKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ
 [...]
+        final int numMsg = 10;
+
+        Map<String, String> privateKeyFileMap = Maps.newHashMap();
+        privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
+        privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
+        Map<String, String> privateKeyDataMap = Maps.newHashMap();
+        privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
+        privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub2")
+                .defaultCryptoKeyReader(ecdsaPrivateKeyData).subscribe();
+        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub3")
+                .defaultCryptoKeyReader(privateKeyFileMap).subscribe();
+        Consumer<byte[]> consumer4 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub4")
+                .defaultCryptoKeyReader(privateKeyDataMap).subscribe();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublicKeyFile).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublicKeyData).create();
+
+        for (int i = 0; i < numMsg; i++) {
+            producer1.send(("my-message-" + i).getBytes());
+        }
+        for (int i = numMsg; i < numMsg * 2; i++) {
+            producer2.send(("my-message-" + i).getBytes());
+        }
+
+        producer1.close();
+        producer2.close();
+
+        for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) 
Lists.newArrayList(consumer1, consumer2)) {
+            MessageImpl<byte[]> msg = null;
+
+            for (int i = 0; i < numMsg * 2; i++) {
+                msg = (MessageImpl<byte[]>) consumer.receive(5, 
TimeUnit.SECONDS);
+                // verify that encrypted message contains encryption-context
+                msg.getEncryptionCtx().orElseThrow(
+                        () -> new IllegalStateException("encryption-ctx not 
present for encrypted message"));
+                assertEquals(new String(msg.getData()), "my-message-" + i);
+            }
+
+            // Acknowledge the consumption of all messages at once
+            consumer.acknowledgeCumulative(msg);
+        }
+
+        consumer1.unsubscribe();
+        consumer2.unsubscribe();
+
+        Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
+                .defaultCryptoKeyReader(rsaPublicKeyFile).create();
+        Producer<byte[]> producer4 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
+                .defaultCryptoKeyReader(rsaPublicKeyData).create();
+
+        for (int i = numMsg * 2; i < numMsg * 3; i++) {
+            producer3.send(("my-message-" + i).getBytes());
+        }
+        for (int i = numMsg * 3; i < numMsg * 4; i++) {
+            producer4.send(("my-message-" + i).getBytes());
+        }
+
+        producer3.close();
+        producer4.close();
+
+        for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) 
Lists.newArrayList(consumer3, consumer4)) {
+            MessageImpl<byte[]> msg = null;
+
+            for (int i = 0; i < numMsg * 4; i++) {
+                msg = (MessageImpl<byte[]>) consumer.receive(5, 
TimeUnit.SECONDS);
+                // verify that encrypted message contains encryption-context
+                msg.getEncryptionCtx().orElseThrow(
+                        () -> new IllegalStateException("encryption-ctx not 
present for encrypted message"));
+                assertEquals(new String(msg.getData()), "my-message-" + i);
+            }
+
+            // Acknowledge the consumption of all messages at once
+            consumer.acknowledgeCumulative(msg);
+        }
+
+        consumer3.unsubscribe();
+        consumer4.unsubscribe();
+    }
+
+    @Test(groups = "encryption")
     public void testRedeliveryOfFailedMessages() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 7a55d2f..8c8e83a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -645,6 +646,93 @@ public class TopicReaderTest extends ProducerConsumerBase {
         reader.close();
     }
 
+    @Test(groups = "encryption")
+    public void testDefaultCryptoKeyReader() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-reader-default-crypto-key-reader"
+                + System.currentTimeMillis();
+        final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+        final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+        final String ecdsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS
 [...]
+        final String ecdsaPrivateKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQ
 [...]
+        final String rsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-rsa.pem";
+        final String rsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-rsa.pem";
+        final String rsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu
 [...]
+        final String rsaPrivateKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ
 [...]
+        final int numMsg = 10;
+
+        Map<String, String> privateKeyFileMap = Maps.newHashMap();
+        privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile);
+        privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile);
+        Map<String, String> privateKeyDataMap = Maps.newHashMap();
+        privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData);
+        privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData);
+
+        Reader<byte[]> reader1 = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
+                .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create();
+        Reader<byte[]> reader2 = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
+                .defaultCryptoKeyReader(ecdsaPrivateKeyData).create();
+        Reader<byte[]> reader3 = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
+                .defaultCryptoKeyReader(privateKeyFileMap).create();
+        Reader<byte[]> reader4 = 
pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest)
+                .defaultCryptoKeyReader(privateKeyDataMap).create();
+
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublicKeyFile).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublicKeyData).create();
+
+        for (int i = 0; i < numMsg; i++) {
+            producer1.send(("my-message-" + i).getBytes());
+        }
+        for (int i = numMsg; i < numMsg * 2; i++) {
+            producer2.send(("my-message-" + i).getBytes());
+        }
+
+        producer1.close();
+        producer2.close();
+
+        for (Reader<byte[]> reader : (List<Reader<byte[]>>) 
Lists.newArrayList(reader1, reader2)) {
+            for (int i = 0; i < numMsg * 2; i++) {
+                MessageImpl<byte[]> msg = (MessageImpl<byte[]>) 
reader.readNext(5, TimeUnit.SECONDS);
+                // verify that encrypted message contains encryption-context
+                msg.getEncryptionCtx().orElseThrow(
+                        () -> new IllegalStateException("encryption-ctx not 
present for encrypted message"));
+                assertEquals(new String(msg.getData()), "my-message-" + i);
+            }
+        }
+
+        reader1.close();
+        reader2.close();
+
+        Producer<byte[]> producer3 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
+                .defaultCryptoKeyReader(rsaPublicKeyFile).create();
+        Producer<byte[]> producer4 = 
pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem")
+                .defaultCryptoKeyReader(rsaPublicKeyData).create();
+
+        for (int i = numMsg * 2; i < numMsg * 3; i++) {
+            producer3.send(("my-message-" + i).getBytes());
+        }
+        for (int i = numMsg * 3; i < numMsg * 4; i++) {
+            producer4.send(("my-message-" + i).getBytes());
+        }
+
+        producer3.close();
+        producer4.close();
+
+        for (Reader<byte[]> reader : (List<Reader<byte[]>>) 
Lists.newArrayList(reader3, reader4)) {
+            for (int i = 0; i < numMsg * 4; i++) {
+                MessageImpl<byte[]> msg = (MessageImpl<byte[]>) 
reader.readNext(5, TimeUnit.SECONDS);
+                // verify that encrypted message contains encryption-context
+                msg.getEncryptionCtx().orElseThrow(
+                        () -> new IllegalStateException("encryption-ctx not 
present for encrypted message"));
+                assertEquals(new String(msg.getData()), "my-message-" + i);
+            }
+        }
+
+        reader3.close();
+        reader4.close();
+    }
+
     @Test
     public void testSimpleReaderReachEndOfTopic() throws Exception {
         Reader<byte[]> reader = 
pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic")
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index c6a6555..85a4ca5 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -271,6 +271,30 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
 
     /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to decrypt the message payloads.
+     *
+     * @param privateKey
+     *            the private key that is always used to decrypt message 
payloads.
+     * @return the consumer builder instance
+     * @since 2.8.0
+     */
+    ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey);
+
+    /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to decrypt the message payloads.
+     *
+     * @param privateKeys
+     *            the map of private key names and their URIs used to decrypt 
message payloads.
+     * @return the consumer builder instance
+     * @since 2.8.0
+     */
+    ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
+
+    /**
      * Sets a {@link MessageCrypto}.
      *
      * <p>Contains methods to encrypt/decrypt message for End to End 
Encryption.
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 8c40343..62d9b20 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -335,6 +335,30 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
 
     /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to encrypt the message payloads.
+     *
+     * @param publicKey
+     *            the public key that is always used to encrypt message 
payloads.
+     * @return the producer builder instance
+     * @since 2.8.0
+     */
+    ProducerBuilder<T> defaultCryptoKeyReader(String publicKey);
+
+    /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to encrypt the message payloads.
+     *
+     * @param publicKeys
+     *            the map of public key names and their URIs used to encrypt 
message payloads.
+     * @return the producer builder instance
+     * @since 2.8.0
+     */
+    ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);
+
+    /**
      * Add public encryption key, used by producer to encrypt the data key.
      *
      * <p>At the time of producer creation, Pulsar client checks if there are 
keys added to encryptionKeys. If keys are
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 506ad86..4adf492 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -170,6 +170,30 @@ public interface ReaderBuilder<T> extends Cloneable {
     ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader);
 
     /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to decrypt the message payloads.
+     *
+     * @param privateKey
+     *            the private key that is always used to decrypt message 
payloads.
+     * @return the reader builder instance
+     * @since 2.8.0
+     */
+    ReaderBuilder<T> defaultCryptoKeyReader(String privateKey);
+
+    /**
+     * Sets the default implementation of {@link CryptoKeyReader}.
+     *
+     * <p>Configure the key reader to be used to decrypt the message payloads.
+     *
+     * @param privateKeys
+     *            the map of private key names and their URIs used to decrypt 
message payloads.
+     * @return the reader builder instance
+     * @since 2.8.0
+     */
+    ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys);
+
+    /**
      * Sets the {@link ConsumerCryptoFailureAction} to specify.
      *
      * @param action
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 8429598..629e751 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
@@ -240,6 +241,18 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey) {
+        checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot 
be blank");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build());
+    }
+
+    @Override
+    public ConsumerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, 
String> privateKeys) {
+        checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
+    }
+
+    @Override
     public ConsumerBuilder<T> messageCrypto(@NonNull MessageCrypto 
messageCrypto) {
         conf.setMessageCrypto(messageCrypto);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java
new file mode 100644
index 0000000..4e8f6c3
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URLConnection;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.url.URL;
+import 
org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ 
+public class DefaultCryptoKeyReader implements CryptoKeyReader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultCryptoKeyReader.class);
+
+    private static final String APPLICATION_X_PEM_FILE = 
"application/x-pem-file";
+ 
+    private String defaultPublicKey;
+    private String defaultPrivateKey;
+
+    private Map<String, String> publicKeys;
+    private Map<String, String> privateKeys;
+
+    public static DefaultCryptoKeyReaderBuilder builder() {
+        return new DefaultCryptoKeyReaderBuilder();
+    }
+ 
+    DefaultCryptoKeyReader(DefaultCryptoKeyReaderConfigurationData conf) {
+        this.defaultPublicKey = conf.getDefaultPublicKey();
+        this.defaultPrivateKey = conf.getDefaultPrivateKey();
+        this.publicKeys = conf.getPublicKeys();
+        this.privateKeys = conf.getPrivateKeys();
+    }
+ 
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        String publicKey = publicKeys.getOrDefault(keyName, defaultPublicKey);
+
+        if (publicKey == null) {
+            LOG.warn("Public key named {} is not set", keyName);
+        } else {
+            try {
+                keyInfo.setKey(loadKey(publicKey));
+            } catch (Exception e) {
+                LOG.error("Failed to load public key named {}", keyName, e);
+            }
+        }
+
+        return keyInfo;
+    }
+ 
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        String privateKey = privateKeys.getOrDefault(keyName, 
defaultPrivateKey);
+
+        if (privateKey == null) {
+            LOG.warn("Private key named {} is not set", keyName);
+        } else {
+            try {
+                keyInfo.setKey(loadKey(privateKey));
+            } catch (Exception e) {
+                LOG.error("Failed to load private key named {}", keyName, e);
+            }
+        }
+
+        return keyInfo;
+    }
+
+    private byte[] loadKey(String keyUrl) throws IOException, 
IllegalAccessException, InstantiationException {
+        try {
+            URLConnection urlConnection = new URL(keyUrl).openConnection();
+            String protocol = urlConnection.getURL().getProtocol();
+            if ("data".equals(protocol) && 
!APPLICATION_X_PEM_FILE.equals(urlConnection.getContentType())) {
+                throw new IllegalArgumentException(
+                        "Unsupported media type or encoding format: " + 
urlConnection.getContentType());
+            }
+            return IOUtils.toByteArray(urlConnection);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid key format");
+        }
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
new file mode 100644
index 0000000..4d98a01
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+ 
+import 
org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData;
+ 
+public class DefaultCryptoKeyReaderBuilder implements Cloneable {
+
+    private DefaultCryptoKeyReaderConfigurationData conf;
+ 
+    DefaultCryptoKeyReaderBuilder() {
+        this(new DefaultCryptoKeyReaderConfigurationData());
+    }
+
+    DefaultCryptoKeyReaderBuilder(DefaultCryptoKeyReaderConfigurationData 
conf) {
+        this.conf = conf;
+    }
+
+    public DefaultCryptoKeyReaderBuilder defaultPublicKey(String 
defaultPublicKey) {
+        conf.setDefaultPublicKey(defaultPublicKey);
+        return this;
+    }
+
+    public DefaultCryptoKeyReaderBuilder defaultPrivateKey(String 
defaultPrivateKey) {
+        conf.setDefaultPrivateKey(defaultPrivateKey);
+        return this;
+    }
+
+    public DefaultCryptoKeyReaderBuilder publicKey(String keyName, String 
publicKey) {
+        conf.setPublicKey(keyName, publicKey);
+        return this;
+    }
+
+    public DefaultCryptoKeyReaderBuilder privateKey(String keyName, String 
privateKey) {
+        conf.setPrivateKey(keyName, privateKey);
+        return this;
+    }
+
+    public DefaultCryptoKeyReaderBuilder publicKeys(Map<String, String> 
publicKeys) {
+        conf.getPublicKeys().putAll(publicKeys);
+        return this;
+    }
+
+    public DefaultCryptoKeyReaderBuilder privateKeys(Map<String, String> 
privateKeys) {
+        conf.getPrivateKeys().putAll(privateKeys);
+        return this;
+    }
+
+    public DefaultCryptoKeyReader build() {
+        return new DefaultCryptoKeyReader(conf);
+    }
+
+    @Override
+    public DefaultCryptoKeyReaderBuilder clone() {
+        return new DefaultCryptoKeyReaderBuilder(conf.clone());
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 64a22d7..2ae20c4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
 import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper;
+import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -202,6 +203,18 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> defaultCryptoKeyReader(String publicKey) {
+        checkArgument(StringUtils.isNotBlank(publicKey), "publicKey cannot be 
blank");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPublicKey(publicKey).build());
+    }
+
+    @Override
+    public ProducerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, 
String> publicKeys) {
+        checkArgument(!publicKeys.isEmpty(), "publicKeys cannot be empty");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().publicKeys(publicKeys).build());
+    }
+
+    @Override
     public ProducerBuilder<T> addEncryptionKey(String key) {
         checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be 
blank");
         conf.getEncryptionKeys().add(key);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 1c62a47..e4f02b6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.AccessLevel;
 import lombok.Getter;
 import com.google.common.base.Preconditions;
+import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -141,6 +144,18 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
     }
 
     @Override
+    public ReaderBuilder<T> defaultCryptoKeyReader(String privateKey) {
+        checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot 
be blank");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build());
+    }
+
+    @Override
+    public ReaderBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, 
String> privateKeys) {
+        checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty");
+        return 
cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build());
+    }
+
+    @Override
     public ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction 
action) {
         conf.setCryptoFailureAction(action);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java
new file mode 100644
index 0000000..9db9099
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class DefaultCryptoKeyReaderConfigurationData implements Serializable, 
Cloneable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String TO_STRING_FORMAT = "%s(defaultPublicKey=%s, 
defaultPrivateKey=%s, publicKeys=%s, privateKeys=%s)";
+
+    @NonNull
+    private String defaultPublicKey;
+    @NonNull
+    private String defaultPrivateKey;
+
+    @NonNull
+    private Map<String, String> publicKeys = new HashMap<>();
+    @NonNull
+    private Map<String, String> privateKeys = new HashMap<>();
+
+    public void setPublicKey(@NonNull String keyName, @NonNull String 
publicKey) {
+        publicKeys.put(keyName, publicKey);
+    }
+
+    public void setPrivateKey(@NonNull String keyName, @NonNull String 
privateKey) {
+        privateKeys.put(keyName, privateKey);
+    }
+
+    @Override
+    public DefaultCryptoKeyReaderConfigurationData clone() {
+        DefaultCryptoKeyReaderConfigurationData clone = new 
DefaultCryptoKeyReaderConfigurationData();
+
+        if (defaultPublicKey != null) {
+            clone.setDefaultPublicKey(defaultPublicKey);
+        }
+
+        if (defaultPrivateKey != null) {
+            clone.setDefaultPrivateKey(defaultPrivateKey);
+        }
+
+        if (publicKeys != null) {
+            clone.setPublicKeys(new HashMap<String, String>(publicKeys));
+        }
+
+        if (privateKeys != null) {
+            clone.setPrivateKeys(new HashMap<String, String>(privateKeys));
+        }
+
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(TO_STRING_FORMAT, getClass().getSimpleName(), 
maskKeyData(defaultPublicKey),
+                maskKeyData(defaultPrivateKey), maskKeyData(publicKeys), 
maskKeyData(privateKeys));
+    }
+
+    private static String maskKeyData(Map<String, String> keys) {
+        if (keys == null) {
+            return "null";
+        } else {
+            StringBuilder keysStr = new StringBuilder();
+            keysStr.append("{");
+
+            List<String> kvList = new ArrayList<>();
+            keys.forEach((k, v) -> kvList.add(k + "=" + maskKeyData(v)));
+            keysStr.append(String.join(", ", kvList));
+
+            keysStr.append("}");
+            return keysStr.toString();
+        }
+    }
+
+    private static String maskKeyData(String key) {
+        if (key == null) {
+            return "null";
+        } else if (key.startsWith("data:")) {
+            return "data:*****";
+        } else {
+            return key;
+        }
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 02d77a7..b2ce170 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -121,6 +121,29 @@ public class ConsumerBuilderImplTest {
                 .cryptoKeyReader(null);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void 
testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() {
+        
consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName")
+                .defaultCryptoKeyReader((String) null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void 
testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() {
+        
consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName").defaultCryptoKeyReader("");
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() {
+        
consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName")
+                .defaultCryptoKeyReader((Map<String, String>) null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() {
+        
consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName")
+                .defaultCryptoKeyReader(new HashMap<String, String>());
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     public void testConsumerBuilderImplWhenCryptoFailureActionIsNull() {
         consumerBuilderImpl.topic(TOPIC_NAME)
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java
new file mode 100644
index 0000000..f1e4cce
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+public class DefaultCryptoKeyReaderTest {
+
+    @Test
+    public void testBuild() throws Exception {
+        Map<String, String> publicKeys = new HashMap<>();
+        publicKeys.put("key1", "file:///path/to/public1.key");
+        publicKeys.put("key2", "file:///path/to/public2.key");
+
+        Map<String, String> privateKeys = new HashMap<>();
+        privateKeys.put("key3", "file:///path/to/private3.key");
+
+        DefaultCryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
+                .defaultPublicKey("file:///path/to/default-public.key")
+                .defaultPrivateKey("file:///path/to/default-private.key")
+                .publicKey("key4", 
"file:///path/to/public4.key").publicKeys(publicKeys)
+                .publicKey("key5", 
"file:///path/to/public5.key").privateKey("key6", 
"file:///path/to/private6.key")
+                .privateKeys(privateKeys).privateKey("key7", 
"file:///path/to/private7.key").build();
+
+        Field defaultPublicKeyField = 
keyReader.getClass().getDeclaredField("defaultPublicKey");
+        defaultPublicKeyField.setAccessible(true);
+        Field defaultPrivateKeyField = 
keyReader.getClass().getDeclaredField("defaultPrivateKey");
+        defaultPrivateKeyField.setAccessible(true);
+        Field publicKeysField = 
keyReader.getClass().getDeclaredField("publicKeys");
+        publicKeysField.setAccessible(true);
+        Field privateKeysField = 
keyReader.getClass().getDeclaredField("privateKeys");
+        privateKeysField.setAccessible(true);
+
+        Map<String, String> expectedPublicKeys = new HashMap<>();
+        expectedPublicKeys.put("key1", "file:///path/to/public1.key");
+        expectedPublicKeys.put("key2", "file:///path/to/public2.key");
+        expectedPublicKeys.put("key4", "file:///path/to/public4.key");
+        expectedPublicKeys.put("key5", "file:///path/to/public5.key");
+
+        Map<String, String> expectedPrivateKeys = new HashMap<>();
+        expectedPrivateKeys.put("key3", "file:///path/to/private3.key");
+        expectedPrivateKeys.put("key6", "file:///path/to/private6.key");
+        expectedPrivateKeys.put("key7", "file:///path/to/private7.key");
+
+        assertEquals((String) defaultPublicKeyField.get(keyReader), 
"file:///path/to/default-public.key");
+        assertEquals((String) defaultPrivateKeyField.get(keyReader), 
"file:///path/to/default-private.key");
+        assertEquals((Map<String, String>) publicKeysField.get(keyReader), 
expectedPublicKeys);
+        assertEquals((Map<String, String>) privateKeysField.get(keyReader), 
expectedPrivateKeys);
+    }
+
+    @Test
+    public void testGetKeys() throws Exception {
+        final String ecdsaPublicKey = 
"./src/test/resources/crypto_ecdsa_public.key";
+        final String ecdsaPrivateKey = 
"./src/test/resources/crypto_ecdsa_private.key";
+        final String rsaPublicKey = 
"./src/test/resources/crypto_rsa_public.key";
+        final String rsaPrivateKey = 
"./src/test/resources/crypto_rsa_private.key";
+
+        DefaultCryptoKeyReader keyReader1 = 
DefaultCryptoKeyReader.builder().build();
+        assertNull(keyReader1.getPublicKey("key0", null).getKey());
+        assertNull(keyReader1.getPrivateKey("key0", null).getKey());
+
+        DefaultCryptoKeyReader keyReader2 = 
DefaultCryptoKeyReader.builder().defaultPublicKey("file:" + ecdsaPublicKey)
+                .defaultPrivateKey("file:" + ecdsaPrivateKey)
+                .publicKey("key1",
+                        
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF6elRUenNTc1pGWWxXeWJack1OdwphRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFIb1U1eStjY0RoeXhCR0NLUFprVGNRYXN2WWdXSjNzSFJLQWxOCmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkE4VHF5Umt4eGw1dnBBTWM2OVVqWDlIUHdTemxtckM3WlhtMWUKU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVWFoaFJuOHcyRmtzOUpXcmlOS2kxUFNnQ1BqTWpnS0JGN3lhRQpBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsRXhSMU1EaVZW
 [...]
+                .privateKey("key1",
+                        
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBenpUVHpzU3NaRllsV3liWnJNTndhRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFICm9VNXkrY2NEaHl4QkdDS1Baa1RjUWFzdllnV0ozc0hSS0FsTmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkEKOFRxeVJreHhsNXZwQU1jNjlValg5SFB3U3psbXJDN1pYbTFlU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVQphaGhSbjh3MkZrczlKV3JpTktpMVBTZ0NQak1qZ0tCRjd5YUVBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsCkV4UjFNRGlWVkJ0dzlRdEpCMjlJTmkwTkRzMlBlYjYx
 [...]
+                .publicKey("key2", "file:invalid").privateKey("key2", 
"file:invalid").publicKey("key3", "data:invalid")
+                .privateKey("key3", "data:invalid").build();
+
+        assertNotNull(keyReader2.getPublicKey("key0", null).getKey());
+        assertEquals(keyReader2.getPublicKey("key0", null).getKey(), 
Files.readAllBytes(Paths.get(ecdsaPublicKey)));
+        assertNotNull(keyReader2.getPrivateKey("key0", null).getKey());
+        assertEquals(keyReader2.getPrivateKey("key0", null).getKey(), 
Files.readAllBytes(Paths.get(ecdsaPrivateKey)));
+
+        assertNotNull(keyReader2.getPublicKey("key1", null).getKey());
+        assertEquals(keyReader2.getPublicKey("key1", null).getKey(), 
Files.readAllBytes(Paths.get(rsaPublicKey)));
+        assertNotNull(keyReader2.getPrivateKey("key1", null).getKey());
+        assertEquals(keyReader2.getPrivateKey("key1", null).getKey(), 
Files.readAllBytes(Paths.get(rsaPrivateKey)));
+
+        assertNull(keyReader1.getPublicKey("key2", null).getKey());
+        assertNull(keyReader1.getPrivateKey("key2", null).getKey());
+        assertNull(keyReader1.getPublicKey("key3", null).getKey());
+        assertNull(keyReader1.getPrivateKey("key3", null).getKey());
+    }
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index 1031f02..b3cda2e 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -195,6 +195,30 @@ public class ProducerBuilderImplTest {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
+    public void 
testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() throws 
PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((String) 
null).create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void 
testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() throws 
PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        
producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader("").create();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() 
throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        
producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((Map<String, 
String>) null).create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() 
throws PulsarClientException {
+        producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+        producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader(new 
HashMap<String, String>()).create();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
     public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws 
PulsarClientException {
         producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
         producerBuilderImpl.topic(TOPIC_NAME)
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java
new file mode 100644
index 0000000..cb38ea5
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import org.testng.annotations.Test;
+
+public class DefaultCryptoKeyReaderConfigurationDataTest {
+
+    @Test
+    public void testClone() throws Exception {
+        DefaultCryptoKeyReaderConfigurationData conf = new 
DefaultCryptoKeyReaderConfigurationData();
+        conf.setDefaultPublicKey("file:///path/to/default-public.key");
+        conf.setDefaultPrivateKey("file:///path/to/default-private.key");
+        conf.setPublicKey("key1", "file:///path/to/public1.key");
+        conf.setPrivateKey("key2", "file:///path/to/private2.key");
+        DefaultCryptoKeyReaderConfigurationData clone = conf.clone();
+
+        conf.setDefaultPublicKey("data:AAAAA");
+        conf.setDefaultPrivateKey("data:BBBBB");
+        conf.setPublicKey("key1", "data:CCCCC");
+        conf.setPrivateKey("key2", "data:DDDDD");
+
+        assertEquals(clone.getDefaultPublicKey(), 
"file:///path/to/default-public.key");
+        assertEquals(clone.getDefaultPrivateKey(), 
"file:///path/to/default-private.key");
+        assertEquals(clone.getPublicKeys().get("key1"), 
"file:///path/to/public1.key");
+        assertEquals(clone.getPrivateKeys().get("key2"), 
"file:///path/to/private2.key");
+    }
+
+    @Test
+    public void testToString() throws Exception {
+        DefaultCryptoKeyReaderConfigurationData conf = new 
DefaultCryptoKeyReaderConfigurationData();
+        assertEquals(conf.toString(),
+                
"DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=null, 
defaultPrivateKey=null, publicKeys={}, privateKeys={})");
+
+        conf.setDefaultPublicKey("file:///path/to/default-public.key");
+        conf.setDefaultPrivateKey("data:AAAAA");
+        conf.setPublicKey("key1", "file:///path/to/public.key");
+        conf.setPrivateKey("key2", "file:///path/to/private.key");
+        assertEquals(conf.toString(),
+                
"DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key,
 defaultPrivateKey=data:*****, publicKeys={key1=file:///path/to/public.key}, 
privateKeys={key2=file:///path/to/private.key})");
+
+        conf.setPublicKey("key3", "data:BBBBB");
+        conf.setPrivateKey("key4", "data:CCCCC");
+        assertTrue(conf.toString().startsWith(
+                
"DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key,
 defaultPrivateKey=data:*****, publicKeys={"));
+        assertTrue(conf.toString().contains("key3=data:*****"));
+        assertFalse(conf.toString().contains("key3=data:BBBBB"));
+        assertTrue(conf.toString().contains("key4=data:*****"));
+        assertFalse(conf.toString().contains("key4=data:CCCCC"));
+        assertTrue(conf.toString().endsWith("})"));
+    }
+
+}
diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_private.key 
b/pulsar-client/src/test/resources/crypto_ecdsa_private.key
new file mode 100644
index 0000000..036a1e7
--- /dev/null
+++ b/pulsar-client/src/test/resources/crypto_ecdsa_private.key
@@ -0,0 +1,29 @@
+-----BEGIN EC PARAMETERS-----
+MIIBwgIBATBNBgcqhkjOPQEBAkIB////////////////////////////////////
+//////////////////////////////////////////////////8wgZ4EQgH/////
+////////////////////////////////////////////////////////////////
+/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh
+Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5
+MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte
+d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY
+9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC
+QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3
+CaXQO7XJuImcR667b7cekThkCQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIICnQIBAQRCAPvNzZPrKUZao8oLRet7MUTfa9TUEJYw2C4TyMSP54YWzyoQsGJJ
+FQVGsJJWBvmEDTfU5zU6d+vvbLVWy7R0aMDXoIIBxjCCAcICAQEwTQYHKoZIzj0B
+AQJCAf//////////////////////////////////////////////////////////
+////////////////////////////MIGeBEIB////////////////////////////
+//////////////////////////////////////////////////////////wEQVGV
+PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1
+c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3
+BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB
+hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu
+cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB////////////////////
+///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4
+ZAkCAQGhgYkDgYYABAFqUEjls03bQowJQUSnTiqzTvdXE26NN891SHoDx2wqfRg2
+OSjzI1bQcBT2GARfut0/tVDbx3gX8qmaPEY9un3dugHSEgX209yLhHuXfIJKZm6Y
+indL6RxwL6Wdqv9bsHUi7dSXTBIc60C0a6gxzhX0I9P4g+Og8hrsW0Hmmm1zJQ8x
+gQ==
+-----END EC PRIVATE KEY-----
diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_public.key 
b/pulsar-client/src/test/resources/crypto_ecdsa_public.key
new file mode 100644
index 0000000..941afeb
--- /dev/null
+++ b/pulsar-client/src/test/resources/crypto_ecdsa_public.key
@@ -0,0 +1,15 @@
+-----BEGIN PUBLIC KEY-----
+MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH/////////////
+////////////////////////////////////////////////////////////////
+/////////zCBngRCAf//////////////////////////////////////////////
+///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA
+7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/
+AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk
+gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY
+OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh
+NTxwhqJywkCIvpR2n9FmUAJCAf//////////////////////////////////////
+////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBalBI
+5bNN20KMCUFEp04qs073VxNujTfPdUh6A8dsKn0YNjko8yNW0HAU9hgEX7rdP7VQ
+28d4F/KpmjxGPbp93boB0hIF9tPci4R7l3yCSmZumIp3S+kccC+lnar/W7B1Iu3U
+l0wSHOtAtGuoMc4V9CPT+IPjoPIa7FtB5pptcyUPMYE=
+-----END PUBLIC KEY-----
diff --git a/pulsar-client/src/test/resources/crypto_rsa_private.key 
b/pulsar-client/src/test/resources/crypto_rsa_private.key
new file mode 100644
index 0000000..5177bb9
--- /dev/null
+++ b/pulsar-client/src/test/resources/crypto_rsa_private.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAzzTTzsSsZFYlWybZrMNwaDjgqinINo5yNkHuRBPg2r56BEaH
+oU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlNdZNG8Gt3k2SrfDrrtj1KL3G6NWRA
+8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1eSwYTV7+7qs/69JLBnrMJcsl+IyXU
+ahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaEATJ0GMSMc8FvXWwFJsWBWQkUwgql
+ExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49NZlN/klJCXIUtBSIqg9o+iRKVwXL
+HnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ728wIDAQABAoIBAHWFvfiRnkGOhsOL
+ZnzRoMjMMcjxx8gBxQ+3F1/vcmI/FM+l/TllWFsJIJwjYoxA1dqohdCNOmO7RnZc
+sbemhxN/xAWKvpiPyZ+9f4Gug4wjUf0Ebr0jkIfExcy6tk4lse/7L9lLhOf1l6Fj
+NRCUsZ2VxZTIf7WjHvBm6ICNhXdfgc/TOX/H4BBMxyQkkmvS7yQHPmnekVpCjwXi
+RgdPOpBSHUAsuLc3cdO7DzSlXBy+R3UB5bC97efSTwxmMdcGUNQh17Cuw+ou2OLJ
+l/WyMBJgKP0zp8MI2YCP0toE1VV0FWisiNUdyw2mmdsKBPCtZW4Jf/avRLjCpy83
+gyeHi4kCgYEA7fac8u/WoUcRxfwbexxVNIb5jpVgQ22XXUvcOBA3145HFH4kD9Qs
+OlOM48iEX1Gq+FOb+TkVa3yeEbyEHVmNxm7ZqDElGlPnHHgWJfVo4ltemkNQ8cQI
+CiDhUH7D9iGd4TrLq+u8JY/oy0dpJyaJ/GsNPwjVzMiA9kDuI2KKRpcCgYEA3ulp
+sZy82EibT+YYNshfAyzPcQc+CVdcpS3yEUMcMrrGT2IHbAhldzAWvni7PF1CvYoB
+eoPSGiEGMAwAfaUI4psZvATZTf+GWKizlH820lw4tY2NW2tYFwDcZ6EPKdq9ZCOz
+Lfyw2NhLqi2FpFyApcXlA92UUI0FcXCCtAKJ2gUCgYEAsI5merVKiNTDMiNQfHJU
+Ian3pTvddYntYXJ0jUAzPoK46FKDDR9+RTRSd3sCA/sDIEZglnQtGVgXq880MtaM
+I2uBoJH+Fl+kPPI4dKd1z2S9dzV07DxnPqSQp/m2CXt9uWu3S/KW4UO6FIECWuL0
+RE1lQZybjNpDHPKl/akSMTcCgYBqC80WjCRjGJeauTJHzac10XmWogVnWEJg6qzA
+fZbKo4R4e4BgatYqj5wiXTlmDFAV77OoX1Hy0EcrUGpjW8IQXA0wH3Zp3uhBAXD9
+ck/YX7sy0/atyTGNMAGq4zpdhQyYuUsi05Ymcy/789AiU0d4ld7PqfhHIe+2+fmU
+PajrKQKBgQCjpj0Y+E/Hi0SJUKNVPl3u+SN1lLcR1/gMjJbsYFGua1MsLXBNXTSl
+QiYHiadT7BhPEkFdW7u+bFws2admW19BoUb+wgtZT/t7nTyoS4Lag4vyazNPZzdQ
+x6XPrwZimd0XDDitGLjcLf991EFsXSqPznDDfXtJ31+oU6ObnHSItA==
+-----END RSA PRIVATE KEY-----
diff --git a/pulsar-client/src/test/resources/crypto_rsa_public.key 
b/pulsar-client/src/test/resources/crypto_rsa_public.key
new file mode 100644
index 0000000..6982804
--- /dev/null
+++ b/pulsar-client/src/test/resources/crypto_rsa_public.key
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzzTTzsSsZFYlWybZrMNw
+aDjgqinINo5yNkHuRBPg2r56BEaHoU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlN
+dZNG8Gt3k2SrfDrrtj1KL3G6NWRA8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1e
+SwYTV7+7qs/69JLBnrMJcsl+IyXUahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaE
+ATJ0GMSMc8FvXWwFJsWBWQkUwgqlExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49
+NZlN/klJCXIUtBSIqg9o+iRKVwXLHnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ72
+8wIDAQAB
+-----END PUBLIC KEY-----

Reply via email to