jbampton commented on a change in pull request #8962:
URL: https://github.com/apache/pulsar/pull/8962#discussion_r543371155



##########
File path: 
tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();

Review comment:
       ```suggestion
           String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + 
System.currentTimeMillis();
   ```

##########
File path: 
tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order

Review comment:
       ```suggestion
           Set<String> messages = new HashSet();// Since messages are in random 
order
   ```

##########
File path: 
tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: 
tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order

Review comment:
       ```suggestion
           Set<String> messages = new HashSet();// Since messages are in random 
order
   ```

##########
File path: 
tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();

Review comment:
       ```suggestion
           String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + 
System.currentTimeMillis();
   ```

##########
File path: 
tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: 
tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: 
tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: 
tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+
+        // Consuming from consumer 2 and 3 again just to be sure
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```

##########
File path: 
tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
##########
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+    private PulsarContainer pulsarContainer;
+    private URI lookupUrl;
+    private PulsarClient pulsarClient;
+
+    @BeforeClass
+    public void setup() throws PulsarClientException, URISyntaxException, 
PulsarAdminException {
+        Security.addProvider(new 
org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+        pulsarContainer = new PulsarContainer();
+        pulsarContainer.start();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+        lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+        PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("standalone")));
+        admin.namespaces().createNamespace("my-property/my-ns");
+        
admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", 
Sets.newHashSet("standalone"));
+        admin.close();
+    }
+
+    @AfterClass
+    public void cleanup() throws PulsarClientException {
+        pulsarClient.close();
+        pulsarContainer.stop();
+        pulsarContainer.close();
+    }
+
+    private PulsarClient newPulsarClient(String url, int intervalInSecs) 
throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, 
TimeUnit.SECONDS).build();
+    }
+
+    @Test
+    public void testRSAEncryption() throws Exception {
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ 
System.currentTimeMillis();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        final int totalMsg = 10;
+
+        Set<String> messageSet = Sets.newHashSet();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader()).subscribe();
+        Consumer<byte[]> normalConsumer = pulsarClient.newConsumer()
+                .topic(topicName).subscriptionName("my-subscriber-name-normal")
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new 
EncKeyReader()).create();
+
+        for (int i = 0; i < totalMsg; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+        for (int i = totalMsg; i < totalMsg * 2; i++) {
+            String message = "my-message-" + i;
+            producer2.send(message.getBytes());
+        }
+
+        MessageImpl<byte[]> msg = null;
+
+        msg = (MessageImpl<byte[]>) normalConsumer.receive(500, 
TimeUnit.MILLISECONDS);
+        // should not able to read message using normal message.
+        assertNull(msg);
+
+        for (int i = 0; i < totalMsg * 2; i++) {
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, 
T receivedMessage,
+                                                     T expectedMessage) {
+        // Make sure that messages are received in order
+        Assert.assertEquals(receivedMessage, expectedMessage,
+                "Received message " + receivedMessage + " did not match the 
expected message " + expectedMessage);
+
+        // Make sure that there are no duplicates
+        Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
+    }
+
+    @Test
+    public void testRedeliveryOfFailedMessages() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+                .build();
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         *
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         *
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         *
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         *
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer1 = 
newPulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer2 = 
newPulsarClient1.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> consumer3 = 
newPulsarClient2.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+
+        // Consuming from consumer 2 and 3
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+
+        // Consuming from consumer 2 and 3 again just to be sure
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {

Review comment:
       ```suggestion
           for (int i = 0; i < numberOfMessages; i++) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to