This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5e479d631 [improve][client]PIP-436:Add decryptFailListener to 
Consumer (#24702)
cc5e479d631 is described below

commit cc5e479d63103f81e3af833e8b06227d1a6563e1
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Oct 10 08:14:46 2025 +0800

    [improve][client]PIP-436:Add decryptFailListener to Consumer (#24702)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     |   2 +-
 .../impl/ConsumerDecryptFailListenerTest.java      | 318 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 169 ++++++++++-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  14 +
 .../pulsar/client/api/DecryptFailListener.java     |  56 ++++
 .../apache/pulsar/client/api/ProducerBuilder.java  |   2 +-
 .../apache/pulsar/client/api/ReaderBuilder.java    |  14 +
 .../client/api/ReaderDecryptFailListener.java      |  55 ++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  11 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  18 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   7 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   5 +
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  27 +-
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  17 ++
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  27 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   9 +-
 .../impl/conf/ConsumerConfigurationData.java       |   6 +-
 .../client/impl/conf/ReaderConfigurationData.java  |  11 +-
 .../client/impl/ConsumerBuilderImplTest.java       |   2 +-
 19 files changed, 758 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index f5bf31369d8..df7e42df80b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -1580,7 +1580,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
                         .maxRedeliverCount(maxRedeliverCount)
                         .build())
                 .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
-                .messageListener((Consumer::negativeAcknowledge))
+                .messageListener(Consumer::negativeAcknowledge)
                 .subscribe();
 
         Consumer<GenericRecord> deadLetterConsumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
new file mode 100644
index 00000000000..fa8b3a7fcf0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+@Test(groups = "broker-api")
+public class ConsumerDecryptFailListenerTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 10000)
+    public void testDecryptFailListenerException() {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testReaderBuildExceptionsWithSetReaderDecryptFailure"
+        );
+        // should throw exception if decryptFailListener is set without 
setting a messageListener
+        assertThatThrownBy(
+                () -> pulsarClient.newConsumer().topic(topic)
+                        .decryptFailListener(((reader, msg) -> {
+                        }))
+                        .subscriptionName("my-sub")
+                        .subscribe()
+        )
+                .isInstanceOf(PulsarClientException.class)
+                .hasMessageContaining("decryptFailListener must be set with 
messageListener");
+
+        // should throw exception if decryptFailListener was set with 
cryptoFailureAction
+        assertThatThrownBy(
+                () -> pulsarClient.newConsumer().topic(topic)
+                        .decryptFailListener(((reader, msg) -> {
+                        }))
+                        .messageListener((reader, msg) -> {
+                        })
+                        .subscriptionName("my-sub")
+                        .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+                        .subscribe()
+        )
+                .isInstanceOf(PulsarClientException.class)
+                .hasMessageContaining("decryptFailListener can't be set with 
cryptoFailureAction");
+    }
+
+    @Test(timeOut = 20000)
+    public void testDecryptFailListenerBehaviorWithConsumerImpl() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+        );
+        admin.topics().createNonPartitionedTopic(topic);
+        ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topic)
+                .decryptFailListener(((reader, msg) -> {
+                }))
+                .messageListener((reader, msg) -> {
+                })
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-sub")
+                .subscribe();
+        assertNull(consumer1.conf.getCryptoFailureAction());
+
+        ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topic)
+                .messageListener((reader, msg) -> {
+                })
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        // cryptoFailureAction should be null when decryptFailListener is set
+        assertNull(consumer1.conf.getCryptoFailureAction());
+        // cryptoFailureAction should be FAIL by default when 
decryptFailListener is not set
+        assertEquals(consumer2.conf.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+
+        consumer1.close();
+        consumer2.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testDecryptFailListenerBehaviorWithMultiConsumerImpl() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithMultiConsumerImpl"
+        );
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsConsumerImpl<byte[]> consumer1 = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .decryptFailListener(((reader, msg) -> {
+                }))
+                .messageListener((reader, msg) -> {
+                })
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-sub")
+                .subscribe();
+        assertNull(consumer1.conf.getCryptoFailureAction());
+
+        MultiTopicsConsumerImpl<byte[]> consumer2 = 
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+                .topic(topic)
+                .messageListener((reader, msg) -> {
+                })
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        // cryptoFailureAction should be null when decryptFailListener is set
+        assertNull(consumer1.conf.getCryptoFailureAction());
+        // cryptoFailureAction should be FAIL by default when 
decryptFailListener is not set
+        assertEquals(consumer2.conf.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+
+        consumer1.close();
+        consumer2.close();
+    }
+
+    @Test(timeOut = 30000)
+    public void testDecryptFailListenerReceiveMessage() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testDecryptFailListenerReceiveMessage"
+        );
+        admin.topics().createNonPartitionedTopic(topic);
+        int totalMessages = 10;
+        CountDownLatch countDownLatch = new CountDownLatch(10);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .decryptFailListener(((c, msg) -> {
+                    // all messages should come into this listener due to no 
crypto key is set in this consumer
+                    assertTrue(msg.getEncryptionCtx().isPresent());
+                    assertTrue(msg.getEncryptionCtx().get().isEncrypted());
+                    countDownLatch.countDown();
+                }))
+                .messageListener((c, msg) -> {
+                })
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(new EncKeyReader1())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < totalMessages; i++) {
+            producer.send(("msg-" + i).getBytes());
+        }
+        countDownLatch.await();
+
+        consumer.close();
+        producer.close();
+    }
+
+    /**
+     * Test both decryptFailListener and messageListener receive messages.
+     */
+    @Test(timeOut = 30000)
+    public void testBothDecryptFailListenerAndMessageListenerReceiveMessage() 
throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testBothDecryptFailListenerAndMessageListenerReceiveMessage"
+        );
+        int totalMessages = 10;
+        CountDownLatch decryptSuccessCount = new CountDownLatch(5);
+        CountDownLatch decryptFailCount = new CountDownLatch(5);
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+                .decryptFailListener(((c, msg) -> {
+                    decryptFailCount.countDown();
+                }))
+                .cryptoKeyReader(new EncKeyReader1())
+                .messageListener((c, msg) -> {
+                    decryptSuccessCount.countDown();
+                })
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic)
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(new EncKeyReader1())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Producer<byte[]> producer2 = pulsarClient.newProducer()
+                .topic(topic)
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(new EncKeyReader2())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < totalMessages; i++) {
+            if (i % 2 == 0) {
+                producer1.send(("msg-" + i).getBytes());
+            } else {
+                producer2.send(("msg-" + i).getBytes());
+            }
+        }
+
+        decryptSuccessCount.await();
+        decryptFailCount.await();
+
+        consumer.close();
+        producer1.close();
+        producer2.close();
+    }
+
+    static class EncKeyReader1 implements CryptoKeyReader {
+
+        final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+        @Override
+        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+            String certFilePath = 
"./src/test/resources/certificate/public-key.client-rsa.pem";
+            if (Files.isReadable(Paths.get(certFilePath))) {
+                try {
+                    
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    log.error("Failed to read certificate from {}", 
certFilePath);
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+            String certFilePath = 
"./src/test/resources/certificate/private-key.client-rsa.pem";
+            if (Files.isReadable(Paths.get(certFilePath))) {
+                try {
+                    
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    log.error("Failed to read certificate from {}", 
certFilePath);
+                }
+            }
+            return null;
+        }
+    }
+
+    static class EncKeyReader2 implements CryptoKeyReader {
+
+        final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+        @Override
+        public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+            String certFilePath = 
"./src/test/resources/certificate/public-key.client-ecdsa.pem";
+            if (Files.isReadable(Paths.get(certFilePath))) {
+                try {
+                    
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    log.error("Failed to read certificate from {}", 
certFilePath);
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+            String certFilePath = 
"./src/test/resources/certificate/private-key.client-ecdsa.pem";
+            if (Files.isReadable(Paths.get(certFilePath))) {
+                try {
+                    
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                    return keyInfo;
+                } catch (IOException e) {
+                    log.error("Failed to read certificate from {}", 
certFilePath);
+                }
+            }
+            return null;
+        }
+    }
+
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2ffb7af5169..c0b4825e022 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
@@ -26,10 +27,13 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -39,10 +43,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
@@ -958,7 +966,6 @@ public class ReaderTest extends MockedPulsarServiceBaseTest 
{
 
         ReaderBuilder<byte[]> readerBuilder = 
client.newReader().topic(topic).startMessageFromRollbackDuration(100,
                 TimeUnit.SECONDS);
-
         for (int i = 0; i < 3; i++) {
             try {
                 readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
@@ -969,4 +976,164 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test(timeOut = 10000)
+    public void testReaderDecryptFailListenerException() {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testReaderDecryptFailListenerException-"
+        );
+        // should throw exception if readerDecryptFailListener is set without 
setting a readerListener
+        assertThatThrownBy(
+                () -> pulsarClient.newReader().topic(topic)
+                        .readerDecryptFailListener(((reader, msg) -> {
+                        }))
+                        .startMessageId(MessageId.earliest)
+                        .create()
+        )
+                .isInstanceOf(PulsarClientException.class)
+                .hasMessageContaining("readerDecryptFailListener must be set 
with readerListener");
+
+        // should throw exception if readerDecryptFailListener was set with 
cryptoFailureAction
+        assertThatThrownBy(
+                () -> pulsarClient.newReader().topic(topic)
+                        .readerDecryptFailListener(((reader, msg) -> {
+                        }))
+                        .readerListener((reader, msg) -> {
+                        })
+                        .startMessageId(MessageId.latest)
+                        .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+                        .create()
+        )
+                .isInstanceOf(PulsarClientException.class)
+                .hasMessageContaining("readerDecryptFailListener cannot set 
with cryptoFailureAction");
+    }
+
+    @Test(timeOut = 20000)
+    public void testReaderDecryptFailListenerBehaviorWithReaderImpl() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+        );
+        admin.topics().createNonPartitionedTopic(topic);
+        ReaderImpl<byte[]> reader1 = (ReaderImpl<byte[]>) 
pulsarClient.newReader().topic(topic)
+                .readerDecryptFailListener(((r, msg) -> {
+                }))
+                .readerListener((r, msg) -> {
+                })
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        ReaderImpl<byte[]> reader2 = (ReaderImpl<byte[]>) 
pulsarClient.newReader().topic(topic)
+                .readerListener((reader, msg) -> {
+                })
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        // cryptoFailureAction should be null when readerDecryptFailListener 
is set
+        assertNull(reader1.getConsumer().conf.getCryptoFailureAction());
+        // cryptoFailureAction should be FAIL by default when 
readerDecryptFailListener is not set
+        assertEquals(reader2.getConsumer().conf.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+
+        reader1.close();
+        reader2.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testReaderDecryptFailListenerBehaviorWithMultiReaderImpl() 
throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+        );
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsReaderImpl<byte[]> reader1 = 
(MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader().topic(topic)
+                .readerDecryptFailListener(((r, msg) -> {
+                }))
+                .readerListener((r, msg) -> {
+                })
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        MultiTopicsReaderImpl<byte[]> reader2 = 
(MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader().topic(topic)
+                .readerListener((reader, msg) -> {
+                })
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        // cryptoFailureAction should be null when readerDecryptFailListener 
is set
+        
assertNull(reader1.getMultiTopicsConsumer().conf.getCryptoFailureAction());
+        // cryptoFailureAction should be FAIL by default when 
readerDecryptFailListener is not set
+        
assertEquals(reader2.getMultiTopicsConsumer().conf.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+
+        reader1.close();
+        reader2.close();
+    }
+
+    @Test(timeOut = 30000)
+    public void testReaderDecryptFailListenerReceiveMessage() throws Exception 
{
+        final String topic = BrokerTestUtil.newUniqueName(
+                
"persistent://my-property/my-ns/testReaderDecryptFailListenerReceiveMessage"
+        );
+        admin.topics().createNonPartitionedTopic(topic);
+        int totalMessages = 10;
+        CountDownLatch countDownLatch = new CountDownLatch(10);
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+                .readerDecryptFailListener(((c, msg) -> {
+                    // all messages should come into this listener due to no 
crypto key is set in this consumer
+                    assertTrue(msg.getEncryptionCtx().isPresent());
+                    assertTrue(msg.getEncryptionCtx().get().isEncrypted());
+                    countDownLatch.countDown();
+                }))
+                .readerListener((c, msg) -> {
+                })
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        class EncKeyReader implements CryptoKeyReader {
+
+            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String certFilePath = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(certFilePath))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        log.error("Failed to read certificate from {}", 
certFilePath);
+                    }
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String certFilePath = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(certFilePath))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        log.error("Failed to read certificate from {}", 
certFilePath);
+                    }
+                }
+                return null;
+            }
+        }
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(new EncKeyReader())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < totalMessages; i++) {
+            producer.send(("msg-" + i).getBytes());
+        }
+        countDownLatch.await();
+
+        reader.close();
+        producer.close();
+    }
+
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 70904c0897a..843d5a18b65 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -304,6 +304,20 @@ public interface ConsumerBuilder<T> extends Cloneable {
      */
     ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
 
+    /**
+     * Sets a {@link DecryptFailListener} for the consumer.
+     *
+     * <p>The application receives encrypt messages and cannot decrypt 
successfully will through this listener.
+     * <p>Must set with {@link 
ConsumerBuilder#messageListener(MessageListener)}
+     * and calls to {@link Consumer#receive()} are not allowed.
+     * This listener cannot be used with {@link 
ConsumerBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+     *
+     * @param decryptFailListener
+     *            the listener object
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> decryptFailListener(DecryptFailListener<T> 
decryptFailListener);
+
     /**
      * Set the {@link MessageListenerExecutor} to be used for message 
listeners of <b>current consumer</b>.
      * <i>(default: use executor from PulsarClient,
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
new file mode 100644
index 00000000000..3b5492987ec
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+
+/**
+ * A listener that is called when message decryption fails.
+ * <p>This listener is invoked when receives an encrypted message and cannot 
be decrypted successfully,
+ * either because no {@link CryptoKeyReader} is configured or the configured 
{@link CryptoKeyReader}
+ * cannot decrypt the message. This allows applications to handle decryption 
failures separately
+ * from normal message processing.
+ *
+ * <p>This listener must be used together with a {@link MessageListener} and 
cannot be used
+ * with {@link ConsumerCryptoFailureAction}.
+ */
[email protected]
[email protected]
+public interface DecryptFailListener<T> extends Serializable {
+    /**
+     * This method is called whenever a new encrypted message is received and 
cannot be decrypted successfully
+     * by {@link CryptoKeyReader}
+     * <p>Messages are guaranteed to be delivered in order and from the same 
thread for a single consumer
+     *
+     * <p>This method will only be called once for each encrypted message.
+     *
+     * <p>Application is responsible for acknowledging the message by calling 
any of the consumer
+     * acknowledgement methods if needed.
+     *
+     * <p>Application is responsible for handling any exception that could be 
thrown while
+     * processing the undecryptable message.
+     *
+     * @param consumer the consumer that received the undecryptable message
+     * @param msg the encrypted message object that failed decryption
+     */
+    void received(Consumer<T> consumer, Message<T> msg);
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 88eaa890a70..bf44ee92b66 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -246,7 +246,7 @@ public interface ProducerBuilder<T> extends Cloneable {
      * <p>Default routing mode is to round-robin across the available 
partitions.
      *
      * <p>This logic is applied when the application is not setting a key on a
-     * particular message. If the key is set with {@link 
MessageBuilder#setKey(String)},
+     * particular message. If the key is set with {@link 
TypedMessageBuilder#key(String)},
      * then the hash of the key will be used to select a partition for the 
message.
      *
      * @param messageRoutingMode
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 4a313feba43..ead36f13ea8 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -168,6 +168,20 @@ public interface ReaderBuilder<T> extends Cloneable {
      */
     ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
 
+    /**
+     * Sets a {@link ReaderDecryptFailListener} for the reader.
+     *
+     * <p>The application receives encrypt messages and cannot decrypt 
successfully will through this listener.
+     * <p>Must set with {@link ReaderBuilder#readerListener(ReaderListener)}
+     * and calls to {@link Reader#readNext()} are not allowed.
+     * This listener cannot be used with {@link 
ReaderBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+     *
+     * @param readerDecryptFailListener
+     *            the listener object
+     * @return the reader builder instance
+     */
+    ReaderBuilder<T> readerDecryptFailListener(ReaderDecryptFailListener<T> 
readerDecryptFailListener);
+
     /**
      * Sets a {@link CryptoKeyReader} to decrypt the message payloads.
      *
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
new file mode 100644
index 00000000000..dfce5d482d4
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * A listener that is called when message decryption fails.
+ * <p>This listener is invoked when receives an encrypted message and cannot 
be decrypted successfully,
+ * either because no {@link CryptoKeyReader} is configured or the configured 
{@link CryptoKeyReader}
+ * cannot decrypt the message. This allows applications to handle decryption 
failures separately
+ * from normal message processing.
+ *
+ * <p>This listener must be used together with a {@link 
ReaderBuilder#readerListener(ReaderListener)} and cannot be used
+ * with {@link ReaderBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+ */
[email protected]
[email protected]
+public interface ReaderDecryptFailListener<T> extends Serializable {
+    /**
+     * This method is called whenever a new encrypted message is received and 
cannot be decrypted successfully
+     * by {@link CryptoKeyReader}
+     * <p>Messages are guaranteed to be delivered in order and from the same 
thread for a single reader
+     *
+     * <p>This method will only be called once for each encrypted message.
+     *
+     * <p>Application is responsible for acknowledging the message by calling 
any of the reader
+     * acknowledgement methods if needed.
+     *
+     * <p>Application is responsible for handling any exception that could be 
thrown while
+     * processing the undecryptable message.
+     *
+     * @param reader the reader that received the undecryptable message
+     * @param msg the encrypted message object that failed decryption
+     */
+    void received(Reader<T> reader, Message<T> msg);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 27e2216e589..ec95f2c2f65 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
@@ -64,6 +65,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.NoOpLock;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -82,6 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected final String consumerName;
     protected final CompletableFuture<Consumer<T>> subscribeFuture;
     protected final MessageListener<T> listener;
+    protected final DecryptFailListener<T> decryptFailListener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
     protected final MessageListenerExecutor messageListenerExecutor;
@@ -137,6 +140,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                 conf.getConsumerName() == null ? 
RandomStringUtils.randomAlphanumeric(5) : conf.getConsumerName();
         this.subscribeFuture = subscribeFuture;
         this.listener = conf.getMessageListener();
+        this.decryptFailListener = conf.getDecryptFailListener();
         this.consumerEventListener = conf.getConsumerEventListener();
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
@@ -1190,7 +1194,12 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
             }
             unAckedMessageTracker.add(id, msg.getRedeliveryCount());
             beforeConsume(msg);
-            listener.received(ConsumerBase.this, msg);
+            Optional<EncryptionContext> encryptionCtx = msg.getEncryptionCtx();
+            if (decryptFailListener != null && encryptionCtx.isPresent() && 
encryptionCtx.get().isEncrypted()) {
+                decryptFailListener.received(ConsumerBase.this, msg);
+            } else {
+                listener.received(ConsumerBase.this, msg);
+            }
         } catch (Throwable t) {
             log.error("[{}][{}] Message listener error in processing message: 
{}", topic, subscription,
                     msg.getMessageId(), t);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index ab64119cb8f..dc2363c279f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.ConsumerInterceptor;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
@@ -148,6 +149,17 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must 
set with KeyShared subscription"));
         }
+        if (conf.getDecryptFailListener() != null && conf.getMessageListener() 
== null) {
+            return FutureUtil.failedFuture(
+                    new InvalidConfigurationException("decryptFailListener 
must be set with messageListener"));
+        }
+        if (conf.getDecryptFailListener() != null && 
conf.getCryptoFailureAction() != null) {
+            return FutureUtil.failedFuture(
+                    new InvalidConfigurationException("decryptFailListener 
can't be set with cryptoFailureAction"));
+        }
+        if (conf.getDecryptFailListener() == null && 
conf.getCryptoFailureAction() == null) {
+            conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+        }
         if (conf.getBatchReceivePolicy() != null) {
             conf.setReceiverQueueSize(
                     Math.max(conf.getBatchReceivePolicy().getMaxNumMessages(), 
conf.getReceiverQueueSize()));
@@ -309,6 +321,12 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> decryptFailListener(@NonNull 
DecryptFailListener<T> messageListener) {
+        conf.setDecryptFailListener(messageListener);
+        return this;
+    }
+
     @Override
     public ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor 
messageListenerExecutor) {
         checkArgument(messageListenerExecutor != null, 
"messageListenerExecutor needs to be not null");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6d5305136b1..37e9f16fe02 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2028,7 +2028,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return handleCryptoFailure(payload, messageId, currentCnx, 
redeliveryCount, batchSize, true);
         }
 
-
         int maxDecryptedSize = 
msgCrypto.getMaxOutputSize(payload.readableBytes());
         ByteBuf decryptedData = 
PulsarByteBufAllocator.DEFAULT.buffer(maxDecryptedSize);
         ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, 
maxDecryptedSize);
@@ -2045,6 +2044,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData 
messageId, ClientCnx currentCnx,
             int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
 
+        if (conf.getDecryptFailListener() != null) {
+            return DecryptResult.failure(payload.retain());
+        }
         switch (conf.getCryptoFailureAction()) {
         case CONSUME:
             if (cryptoReaderNotExist) {
@@ -2922,7 +2924,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
         return (msgMetadata.getEncryptionKeysCount() > 0 && 
conf.getCryptoKeyReader() == null
-                && conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME);
+                && (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME
+                || conf.getDecryptFailListener() != null));
     }
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a31b72c348c..998dc52951a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -706,6 +707,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         internalConsumerConfig.setSubscriptionName(subscription);
         internalConsumerConfig.setConsumerName(consumerName);
         internalConsumerConfig.setMessageListener(null);
+        internalConsumerConfig.setDecryptFailListener(null);
+        if (internalConsumerConfig.getCryptoFailureAction() == null) {
+            
internalConsumerConfig.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+        }
         return internalConsumerConfig;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 86f3199f297..18c44122050 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -29,12 +29,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.DecryptFailListener;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionMode;
@@ -101,13 +104,35 @@ public class MultiTopicsReaderImpl<T> implements 
Reader<T> {
             });
         }
 
+        if (readerConfiguration.getReaderDecryptFailListener() != null) {
+            ReaderDecryptFailListener<T> readerDecryptFailListener = 
readerConfiguration.getReaderDecryptFailListener();
+            consumerConfiguration.setDecryptFailListener(new 
DecryptFailListener<T>() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public void received(Consumer<T> consumer, Message<T> msg) {
+                    final MessageId messageId = msg.getMessageId();
+                    
readerDecryptFailListener.received(MultiTopicsReaderImpl.this, msg);
+                    
consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+                        log.error("[{}][{}] auto acknowledge decrypt fail 
message {} cumulative fail.", getTopic(),
+                                getMultiTopicsConsumer().getSubscription(), 
messageId, ex);
+                        return null;
+                    });
+                }
+            });
+        }
+
         if (readerConfiguration.getReaderName() != null) {
             
consumerConfiguration.setConsumerName(readerConfiguration.getReaderName());
         }
         if (readerConfiguration.isResetIncludeHead()) {
             consumerConfiguration.setResetIncludeHead(true);
         }
-        
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+        if (readerConfiguration.getCryptoFailureAction() != null) {
+            
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+        } else if (readerConfiguration.getReaderDecryptFailListener() == null) 
{
+            
consumerConfiguration.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+        }
         if (readerConfiguration.getCryptoKeyReader() != null) {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 629a076c596..ed1169ce4db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
 import org.apache.pulsar.client.api.ReaderInterceptor;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
@@ -100,6 +101,16 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
             conf.setStartMessageId(MessageId.earliest);
         }
 
+        if (conf.getReaderDecryptFailListener() != null && 
conf.getReaderListener() == null) {
+            return FutureUtil.failedFuture(new IllegalArgumentException(
+                    "readerDecryptFailListener must be set with readerListener"
+            ));
+        }
+        if (conf.getCryptoFailureAction() != null && 
conf.getReaderDecryptFailListener() != null) {
+            return FutureUtil.failedFuture(new IllegalArgumentException(
+                    "readerDecryptFailListener cannot set with 
cryptoFailureAction"
+            ));
+        }
         return client.createReaderAsync(conf, schema);
     }
 
@@ -152,6 +163,12 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
         return this;
     }
 
+    @Override
+    public ReaderBuilder<T> 
readerDecryptFailListener(ReaderDecryptFailListener<T> 
readerDecryptFailListener) {
+        conf.setReaderDecryptFailListener(readerDecryptFailListener);
+        return this;
+    }
+
     @Override
     public ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
         conf.setCryptoKeyReader(cryptoKeyReader);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 8760d69447a..779ccadbbd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -29,12 +29,15 @@ import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.DecryptFailListener;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -118,7 +121,29 @@ public class ReaderImpl<T> implements Reader<T> {
             });
         }
 
-        
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+        if (readerConfiguration.getReaderDecryptFailListener() != null) {
+            ReaderDecryptFailListener<T> readerDecryptFailListener = 
readerConfiguration.getReaderDecryptFailListener();
+            consumerConfiguration.setDecryptFailListener(new 
DecryptFailListener<>() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public void received(Consumer<T> consumer, Message<T> msg) {
+                    final MessageId messageId = msg.getMessageId();
+                    readerDecryptFailListener.received(ReaderImpl.this, msg);
+                    
consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+                        log.error("[{}][{}] auto acknowledge decrypt fail 
message {} cumulative fail.", getTopic(),
+                                getConsumer().getSubscription(), messageId, 
ex);
+                        return null;
+                    });
+                }
+            });
+        }
+
+        if (readerConfiguration.getCryptoFailureAction() != null) {
+            
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+        } else if (readerConfiguration.getReaderDecryptFailListener() == null) 
{
+            
consumerConfiguration.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+        }
         if (readerConfiguration.getCryptoKeyReader() != null) {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 01ff4d57782..02851ac36fc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -176,7 +176,14 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
                 trackMessage(message);
                 unAckedMessageTracker.add(
                         
MessageIdAdvUtils.discardBatch(message.getMessageId()), 
message.getRedeliveryCount());
-                listener.received(ZeroQueueConsumerImpl.this, 
beforeConsume(message));
+                if (decryptFailListener != null
+                        && message.getEncryptionCtx().isPresent()
+                        && message.getEncryptionCtx().get().isEncrypted()
+                ) {
+                    decryptFailListener.received(ZeroQueueConsumerImpl.this, 
beforeConsume(message));
+                } else {
+                    listener.received(ZeroQueueConsumerImpl.this, 
beforeConsume(message));
+                }
             } catch (Throwable t) {
                 log.error("[{}][{}] Message listener error in processing 
unqueued message: {}", topic, subscription,
                         message.getMessageId(), t);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 4be8c4ed73e..42fc2666573 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageListener;
@@ -97,6 +98,9 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
     @JsonIgnore
     private MessageListener<T> messageListener;
 
+    @JsonIgnore
+    private DecryptFailListener<T> decryptFailListener;
+
     @JsonIgnore
     private ConsumerEventListener consumerEventListener;
 
@@ -284,7 +288,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
                     + "Delivered encrypted message contains {@link 
EncryptionContext} which contains encryption and "
                     + "compression information in it using which application 
can decrypt consumed message payload."
     )
-    private ConsumerCryptoFailureAction cryptoFailureAction = 
ConsumerCryptoFailureAction.FAIL;
+    private ConsumerCryptoFailureAction cryptoFailureAction;
 
     @ApiModelProperty(
             name = "properties",
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index cd5aa4c12f5..1fe39e6329c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
 import org.apache.pulsar.client.api.ReaderInterceptor;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -73,6 +74,12 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
     )
     private ReaderListener<T> readerListener;
 
+    @ApiModelProperty(
+            name = "readerDecryptFailListener",
+            value = "A listener that is called for encrypted message received 
and decrypt fail."
+    )
+    private ReaderDecryptFailListener<T> readerDecryptFailListener;
+
     @ApiModelProperty(
             name = "readerName",
             value = "Reader name"
@@ -112,8 +119,10 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
                     + "\n"
                     + "Delivered encrypted message contains {@link 
EncryptionContext} which contains encryption and "
                     + "compression information in it using which application 
can decrypt consumed message payload."
+                    + "cannot set with {@link ReaderDecryptFailListener}, and 
if ReaderDecryptFailListener are set,\n"
+                    + "application should responsible for handling decryption 
failure."
     )
-    private ConsumerCryptoFailureAction cryptoFailureAction = 
ConsumerCryptoFailureAction.FAIL;
+    private ConsumerCryptoFailureAction cryptoFailureAction;
 
     @JsonIgnore
     private transient MessageCrypto messageCrypto = null;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 6a8d8d9e86e..8b031fbd38b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -552,7 +552,7 @@ public class ConsumerBuilderImplTest {
         assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
         
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
         
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 
TimeUnit.MINUTES.toMillis(1));
-        assertEquals(configurationData.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+        assertNull(configurationData.getCryptoFailureAction());
         assertThat(configurationData.getProperties()).hasSize(1)
             .hasFieldOrPropertyWithValue("prop", "prop-value");
         assertFalse(configurationData.isReadCompacted());

Reply via email to