This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc5e479d631 [improve][client]PIP-436:Add decryptFailListener to
Consumer (#24702)
cc5e479d631 is described below
commit cc5e479d63103f81e3af833e8b06227d1a6563e1
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Oct 10 08:14:46 2025 +0800
[improve][client]PIP-436:Add decryptFailListener to Consumer (#24702)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 2 +-
.../impl/ConsumerDecryptFailListenerTest.java | 318 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ReaderTest.java | 169 ++++++++++-
.../apache/pulsar/client/api/ConsumerBuilder.java | 14 +
.../pulsar/client/api/DecryptFailListener.java | 56 ++++
.../apache/pulsar/client/api/ProducerBuilder.java | 2 +-
.../apache/pulsar/client/api/ReaderBuilder.java | 14 +
.../client/api/ReaderDecryptFailListener.java | 55 ++++
.../apache/pulsar/client/impl/ConsumerBase.java | 11 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 18 ++
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 +-
.../client/impl/MultiTopicsConsumerImpl.java | 5 +
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 27 +-
.../pulsar/client/impl/ReaderBuilderImpl.java | 17 ++
.../org/apache/pulsar/client/impl/ReaderImpl.java | 27 +-
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 9 +-
.../impl/conf/ConsumerConfigurationData.java | 6 +-
.../client/impl/conf/ReaderConfigurationData.java | 11 +-
.../client/impl/ConsumerBuilderImplTest.java | 2 +-
19 files changed, 758 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index f5bf31369d8..df7e42df80b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -1580,7 +1580,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
.maxRedeliverCount(maxRedeliverCount)
.build())
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
- .messageListener((Consumer::negativeAcknowledge))
+ .messageListener(Consumer::negativeAcknowledge)
.subscribe();
Consumer<GenericRecord> deadLetterConsumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
new file mode 100644
index 00000000000..fa8b3a7fcf0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDecryptFailListenerTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Slf4j
+@Test(groups = "broker-api")
+public class ConsumerDecryptFailListenerTest extends ProducerConsumerBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 10000)
+ public void testDecryptFailListenerException() {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testReaderBuildExceptionsWithSetReaderDecryptFailure"
+ );
+ // should throw exception if decryptFailListener is set without
setting a messageListener
+ assertThatThrownBy(
+ () -> pulsarClient.newConsumer().topic(topic)
+ .decryptFailListener(((reader, msg) -> {
+ }))
+ .subscriptionName("my-sub")
+ .subscribe()
+ )
+ .isInstanceOf(PulsarClientException.class)
+ .hasMessageContaining("decryptFailListener must be set with
messageListener");
+
+ // should throw exception if decryptFailListener was set with
cryptoFailureAction
+ assertThatThrownBy(
+ () -> pulsarClient.newConsumer().topic(topic)
+ .decryptFailListener(((reader, msg) -> {
+ }))
+ .messageListener((reader, msg) -> {
+ })
+ .subscriptionName("my-sub")
+ .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+ .subscribe()
+ )
+ .isInstanceOf(PulsarClientException.class)
+ .hasMessageContaining("decryptFailListener can't be set with
cryptoFailureAction");
+ }
+
+ @Test(timeOut = 20000)
+ public void testDecryptFailListenerBehaviorWithConsumerImpl() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+ );
+ admin.topics().createNonPartitionedTopic(topic);
+ ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topic)
+ .decryptFailListener(((reader, msg) -> {
+ }))
+ .messageListener((reader, msg) -> {
+ })
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-sub")
+ .subscribe();
+ assertNull(consumer1.conf.getCryptoFailureAction());
+
+ ConsumerImpl<byte[]> consumer2 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topic)
+ .messageListener((reader, msg) -> {
+ })
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ // cryptoFailureAction should be null when decryptFailListener is set
+ assertNull(consumer1.conf.getCryptoFailureAction());
+ // cryptoFailureAction should be FAIL by default when
decryptFailListener is not set
+ assertEquals(consumer2.conf.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+
+ consumer1.close();
+ consumer2.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testDecryptFailListenerBehaviorWithMultiConsumerImpl() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithMultiConsumerImpl"
+ );
+ admin.topics().createPartitionedTopic(topic, 3);
+ MultiTopicsConsumerImpl<byte[]> consumer1 =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .decryptFailListener(((reader, msg) -> {
+ }))
+ .messageListener((reader, msg) -> {
+ })
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-sub")
+ .subscribe();
+ assertNull(consumer1.conf.getCryptoFailureAction());
+
+ MultiTopicsConsumerImpl<byte[]> consumer2 =
(MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
+ .topic(topic)
+ .messageListener((reader, msg) -> {
+ })
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ // cryptoFailureAction should be null when decryptFailListener is set
+ assertNull(consumer1.conf.getCryptoFailureAction());
+ // cryptoFailureAction should be FAIL by default when
decryptFailListener is not set
+ assertEquals(consumer2.conf.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+
+ consumer1.close();
+ consumer2.close();
+ }
+
+ @Test(timeOut = 30000)
+ public void testDecryptFailListenerReceiveMessage() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testDecryptFailListenerReceiveMessage"
+ );
+ admin.topics().createNonPartitionedTopic(topic);
+ int totalMessages = 10;
+ CountDownLatch countDownLatch = new CountDownLatch(10);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+ .decryptFailListener(((c, msg) -> {
+ // all messages should come into this listener due to no
crypto key is set in this consumer
+ assertTrue(msg.getEncryptionCtx().isPresent());
+ assertTrue(msg.getEncryptionCtx().get().isEncrypted());
+ countDownLatch.countDown();
+ }))
+ .messageListener((c, msg) -> {
+ })
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader1())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ producer.send(("msg-" + i).getBytes());
+ }
+ countDownLatch.await();
+
+ consumer.close();
+ producer.close();
+ }
+
+ /**
+ * Test both decryptFailListener and messageListener receive messages.
+ */
+ @Test(timeOut = 30000)
+ public void testBothDecryptFailListenerAndMessageListenerReceiveMessage()
throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testBothDecryptFailListenerAndMessageListenerReceiveMessage"
+ );
+ int totalMessages = 10;
+ CountDownLatch decryptSuccessCount = new CountDownLatch(5);
+ CountDownLatch decryptFailCount = new CountDownLatch(5);
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
+ .decryptFailListener(((c, msg) -> {
+ decryptFailCount.countDown();
+ }))
+ .cryptoKeyReader(new EncKeyReader1())
+ .messageListener((c, msg) -> {
+ decryptSuccessCount.countDown();
+ })
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topic)
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader1())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Producer<byte[]> producer2 = pulsarClient.newProducer()
+ .topic(topic)
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader2())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ if (i % 2 == 0) {
+ producer1.send(("msg-" + i).getBytes());
+ } else {
+ producer2.send(("msg-" + i).getBytes());
+ }
+ }
+
+ decryptSuccessCount.await();
+ decryptFailCount.await();
+
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ }
+
+ static class EncKeyReader1 implements CryptoKeyReader {
+
+ final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/public-key.client-rsa.pem";
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/private-key.client-rsa.pem";
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+ }
+
+ static class EncKeyReader2 implements CryptoKeyReader {
+
+ final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/public-key.client-ecdsa.pem";
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/private-key.client-ecdsa.pem";
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+ }
+
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2ffb7af5169..c0b4825e022 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -26,10 +27,13 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -39,10 +43,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -958,7 +966,6 @@ public class ReaderTest extends MockedPulsarServiceBaseTest
{
ReaderBuilder<byte[]> readerBuilder =
client.newReader().topic(topic).startMessageFromRollbackDuration(100,
TimeUnit.SECONDS);
-
for (int i = 0; i < 3; i++) {
try {
readerBuilder.createAsync().get(1, TimeUnit.SECONDS);
@@ -969,4 +976,164 @@ public class ReaderTest extends
MockedPulsarServiceBaseTest {
}
}
}
+
+ @Test(timeOut = 10000)
+ public void testReaderDecryptFailListenerException() {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testReaderDecryptFailListenerException-"
+ );
+ // should throw exception if readerDecryptFailListener is set without
setting a readerListener
+ assertThatThrownBy(
+ () -> pulsarClient.newReader().topic(topic)
+ .readerDecryptFailListener(((reader, msg) -> {
+ }))
+ .startMessageId(MessageId.earliest)
+ .create()
+ )
+ .isInstanceOf(PulsarClientException.class)
+ .hasMessageContaining("readerDecryptFailListener must be set
with readerListener");
+
+ // should throw exception if readerDecryptFailListener was set with
cryptoFailureAction
+ assertThatThrownBy(
+ () -> pulsarClient.newReader().topic(topic)
+ .readerDecryptFailListener(((reader, msg) -> {
+ }))
+ .readerListener((reader, msg) -> {
+ })
+ .startMessageId(MessageId.latest)
+ .cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+ .create()
+ )
+ .isInstanceOf(PulsarClientException.class)
+ .hasMessageContaining("readerDecryptFailListener cannot set
with cryptoFailureAction");
+ }
+
+ @Test(timeOut = 20000)
+ public void testReaderDecryptFailListenerBehaviorWithReaderImpl() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+ );
+ admin.topics().createNonPartitionedTopic(topic);
+ ReaderImpl<byte[]> reader1 = (ReaderImpl<byte[]>)
pulsarClient.newReader().topic(topic)
+ .readerDecryptFailListener(((r, msg) -> {
+ }))
+ .readerListener((r, msg) -> {
+ })
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ ReaderImpl<byte[]> reader2 = (ReaderImpl<byte[]>)
pulsarClient.newReader().topic(topic)
+ .readerListener((reader, msg) -> {
+ })
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ // cryptoFailureAction should be null when readerDecryptFailListener
is set
+ assertNull(reader1.getConsumer().conf.getCryptoFailureAction());
+ // cryptoFailureAction should be FAIL by default when
readerDecryptFailListener is not set
+ assertEquals(reader2.getConsumer().conf.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+
+ reader1.close();
+ reader2.close();
+ }
+
+ @Test(timeOut = 20000)
+ public void testReaderDecryptFailListenerBehaviorWithMultiReaderImpl()
throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testDecryptFailListenerBehaviorWithConsumerImpl"
+ );
+ admin.topics().createPartitionedTopic(topic, 3);
+ MultiTopicsReaderImpl<byte[]> reader1 =
(MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader().topic(topic)
+ .readerDecryptFailListener(((r, msg) -> {
+ }))
+ .readerListener((r, msg) -> {
+ })
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ MultiTopicsReaderImpl<byte[]> reader2 =
(MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader().topic(topic)
+ .readerListener((reader, msg) -> {
+ })
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ // cryptoFailureAction should be null when readerDecryptFailListener
is set
+
assertNull(reader1.getMultiTopicsConsumer().conf.getCryptoFailureAction());
+ // cryptoFailureAction should be FAIL by default when
readerDecryptFailListener is not set
+
assertEquals(reader2.getMultiTopicsConsumer().conf.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+
+ reader1.close();
+ reader2.close();
+ }
+
+ @Test(timeOut = 30000)
+ public void testReaderDecryptFailListenerReceiveMessage() throws Exception
{
+ final String topic = BrokerTestUtil.newUniqueName(
+
"persistent://my-property/my-ns/testReaderDecryptFailListenerReceiveMessage"
+ );
+ admin.topics().createNonPartitionedTopic(topic);
+ int totalMessages = 10;
+ CountDownLatch countDownLatch = new CountDownLatch(10);
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
+ .readerDecryptFailListener(((c, msg) -> {
+ // all messages should come into this listener due to no
crypto key is set in this consumer
+ assertTrue(msg.getEncryptionCtx().isPresent());
+ assertTrue(msg.getEncryptionCtx().get().isEncrypted());
+ countDownLatch.countDown();
+ }))
+ .readerListener((c, msg) -> {
+ })
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String certFilePath =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(certFilePath))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(certFilePath)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}",
certFilePath);
+ }
+ }
+ return null;
+ }
+ }
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ producer.send(("msg-" + i).getBytes());
+ }
+ countDownLatch.await();
+
+ reader.close();
+ producer.close();
+ }
+
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 70904c0897a..843d5a18b65 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -304,6 +304,20 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
+ /**
+ * Sets a {@link DecryptFailListener} for the consumer.
+ *
+ * <p>The application receives encrypt messages and cannot decrypt
successfully will through this listener.
+ * <p>Must set with {@link
ConsumerBuilder#messageListener(MessageListener)}
+ * and calls to {@link Consumer#receive()} are not allowed.
+ * This listener cannot be used with {@link
ConsumerBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+ *
+ * @param decryptFailListener
+ * the listener object
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> decryptFailListener(DecryptFailListener<T>
decryptFailListener);
+
/**
* Set the {@link MessageListenerExecutor} to be used for message
listeners of <b>current consumer</b>.
* <i>(default: use executor from PulsarClient,
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
new file mode 100644
index 00000000000..3b5492987ec
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DecryptFailListener.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+
+/**
+ * A listener that is called when message decryption fails.
+ * <p>This listener is invoked when receives an encrypted message and cannot
be decrypted successfully,
+ * either because no {@link CryptoKeyReader} is configured or the configured
{@link CryptoKeyReader}
+ * cannot decrypt the message. This allows applications to handle decryption
failures separately
+ * from normal message processing.
+ *
+ * <p>This listener must be used together with a {@link MessageListener} and
cannot be used
+ * with {@link ConsumerCryptoFailureAction}.
+ */
[email protected]
[email protected]
+public interface DecryptFailListener<T> extends Serializable {
+ /**
+ * This method is called whenever a new encrypted message is received and
cannot be decrypted successfully
+ * by {@link CryptoKeyReader}
+ * <p>Messages are guaranteed to be delivered in order and from the same
thread for a single consumer
+ *
+ * <p>This method will only be called once for each encrypted message.
+ *
+ * <p>Application is responsible for acknowledging the message by calling
any of the consumer
+ * acknowledgement methods if needed.
+ *
+ * <p>Application is responsible for handling any exception that could be
thrown while
+ * processing the undecryptable message.
+ *
+ * @param consumer the consumer that received the undecryptable message
+ * @param msg the encrypted message object that failed decryption
+ */
+ void received(Consumer<T> consumer, Message<T> msg);
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 88eaa890a70..bf44ee92b66 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -246,7 +246,7 @@ public interface ProducerBuilder<T> extends Cloneable {
* <p>Default routing mode is to round-robin across the available
partitions.
*
* <p>This logic is applied when the application is not setting a key on a
- * particular message. If the key is set with {@link
MessageBuilder#setKey(String)},
+ * particular message. If the key is set with {@link
TypedMessageBuilder#key(String)},
* then the hash of the key will be used to select a partition for the
message.
*
* @param messageRoutingMode
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 4a313feba43..ead36f13ea8 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -168,6 +168,20 @@ public interface ReaderBuilder<T> extends Cloneable {
*/
ReaderBuilder<T> readerListener(ReaderListener<T> readerListener);
+ /**
+ * Sets a {@link ReaderDecryptFailListener} for the reader.
+ *
+ * <p>The application receives encrypt messages and cannot decrypt
successfully will through this listener.
+ * <p>Must set with {@link ReaderBuilder#readerListener(ReaderListener)}
+ * and calls to {@link Reader#readNext()} are not allowed.
+ * This listener cannot be used with {@link
ReaderBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+ *
+ * @param readerDecryptFailListener
+ * the listener object
+ * @return the reader builder instance
+ */
+ ReaderBuilder<T> readerDecryptFailListener(ReaderDecryptFailListener<T>
readerDecryptFailListener);
+
/**
* Sets a {@link CryptoKeyReader} to decrypt the message payloads.
*
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
new file mode 100644
index 00000000000..dfce5d482d4
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderDecryptFailListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.io.Serializable;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * A listener that is called when message decryption fails.
+ * <p>This listener is invoked when receives an encrypted message and cannot
be decrypted successfully,
+ * either because no {@link CryptoKeyReader} is configured or the configured
{@link CryptoKeyReader}
+ * cannot decrypt the message. This allows applications to handle decryption
failures separately
+ * from normal message processing.
+ *
+ * <p>This listener must be used together with a {@link
ReaderBuilder#readerListener(ReaderListener)} and cannot be used
+ * with {@link ReaderBuilder#cryptoFailureAction(ConsumerCryptoFailureAction)}.
+ */
[email protected]
[email protected]
+public interface ReaderDecryptFailListener<T> extends Serializable {
+ /**
+ * This method is called whenever a new encrypted message is received and
cannot be decrypted successfully
+ * by {@link CryptoKeyReader}
+ * <p>Messages are guaranteed to be delivered in order and from the same
thread for a single reader
+ *
+ * <p>This method will only be called once for each encrypted message.
+ *
+ * <p>Application is responsible for acknowledging the message by calling
any of the reader
+ * acknowledgement methods if needed.
+ *
+ * <p>Application is responsible for handling any exception that could be
thrown while
+ * processing the undecryptable message.
+ *
+ * @param reader the reader that received the undecryptable message
+ * @param msg the encrypted message object that failed decryption
+ */
+ void received(Reader<T> reader, Message<T> msg);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 27e2216e589..ec95f2c2f65 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
@@ -64,6 +65,7 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NoOpLock;
+import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -82,6 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final String consumerName;
protected final CompletableFuture<Consumer<T>> subscribeFuture;
protected final MessageListener<T> listener;
+ protected final DecryptFailListener<T> decryptFailListener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
protected final MessageListenerExecutor messageListenerExecutor;
@@ -137,6 +140,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
conf.getConsumerName() == null ?
RandomStringUtils.randomAlphanumeric(5) : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
this.listener = conf.getMessageListener();
+ this.decryptFailListener = conf.getDecryptFailListener();
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
@@ -1190,7 +1194,12 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
}
unAckedMessageTracker.add(id, msg.getRedeliveryCount());
beforeConsume(msg);
- listener.received(ConsumerBase.this, msg);
+ Optional<EncryptionContext> encryptionCtx = msg.getEncryptionCtx();
+ if (decryptFailListener != null && encryptionCtx.isPresent() &&
encryptionCtx.get().isEncrypted()) {
+ decryptFailListener.received(ConsumerBase.this, msg);
+ } else {
+ listener.received(ConsumerBase.this, msg);
+ }
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message:
{}", topic, subscription,
msg.getMessageId(), t);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index ab64119cb8f..dc2363c279f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
@@ -148,6 +149,17 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must
set with KeyShared subscription"));
}
+ if (conf.getDecryptFailListener() != null && conf.getMessageListener()
== null) {
+ return FutureUtil.failedFuture(
+ new InvalidConfigurationException("decryptFailListener
must be set with messageListener"));
+ }
+ if (conf.getDecryptFailListener() != null &&
conf.getCryptoFailureAction() != null) {
+ return FutureUtil.failedFuture(
+ new InvalidConfigurationException("decryptFailListener
can't be set with cryptoFailureAction"));
+ }
+ if (conf.getDecryptFailListener() == null &&
conf.getCryptoFailureAction() == null) {
+ conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+ }
if (conf.getBatchReceivePolicy() != null) {
conf.setReceiverQueueSize(
Math.max(conf.getBatchReceivePolicy().getMaxNumMessages(),
conf.getReceiverQueueSize()));
@@ -309,6 +321,12 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
return this;
}
+ @Override
+ public ConsumerBuilder<T> decryptFailListener(@NonNull
DecryptFailListener<T> messageListener) {
+ conf.setDecryptFailListener(messageListener);
+ return this;
+ }
+
@Override
public ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor
messageListenerExecutor) {
checkArgument(messageListenerExecutor != null,
"messageListenerExecutor needs to be not null");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6d5305136b1..37e9f16fe02 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2028,7 +2028,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return handleCryptoFailure(payload, messageId, currentCnx,
redeliveryCount, batchSize, true);
}
-
int maxDecryptedSize =
msgCrypto.getMaxOutputSize(payload.readableBytes());
ByteBuf decryptedData =
PulsarByteBufAllocator.DEFAULT.buffer(maxDecryptedSize);
ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0,
maxDecryptedSize);
@@ -2045,6 +2044,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData
messageId, ClientCnx currentCnx,
int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
+ if (conf.getDecryptFailListener() != null) {
+ return DecryptResult.failure(payload.retain());
+ }
switch (conf.getCryptoFailureAction()) {
case CONSUME:
if (cryptoReaderNotExist) {
@@ -2922,7 +2924,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
return (msgMetadata.getEncryptionKeysCount() > 0 &&
conf.getCryptoKeyReader() == null
- && conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME);
+ && (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME
+ || conf.getDecryptFailListener() != null));
}
/**
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index a31b72c348c..998dc52951a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -706,6 +707,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
internalConsumerConfig.setSubscriptionName(subscription);
internalConsumerConfig.setConsumerName(consumerName);
internalConsumerConfig.setMessageListener(null);
+ internalConsumerConfig.setDecryptFailListener(null);
+ if (internalConsumerConfig.getCryptoFailureAction() == null) {
+
internalConsumerConfig.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+ }
return internalConsumerConfig;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 86f3199f297..18c44122050 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -29,12 +29,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.DecryptFailListener;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
@@ -101,13 +104,35 @@ public class MultiTopicsReaderImpl<T> implements
Reader<T> {
});
}
+ if (readerConfiguration.getReaderDecryptFailListener() != null) {
+ ReaderDecryptFailListener<T> readerDecryptFailListener =
readerConfiguration.getReaderDecryptFailListener();
+ consumerConfiguration.setDecryptFailListener(new
DecryptFailListener<T>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void received(Consumer<T> consumer, Message<T> msg) {
+ final MessageId messageId = msg.getMessageId();
+
readerDecryptFailListener.received(MultiTopicsReaderImpl.this, msg);
+
consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+ log.error("[{}][{}] auto acknowledge decrypt fail
message {} cumulative fail.", getTopic(),
+ getMultiTopicsConsumer().getSubscription(),
messageId, ex);
+ return null;
+ });
+ }
+ });
+ }
+
if (readerConfiguration.getReaderName() != null) {
consumerConfiguration.setConsumerName(readerConfiguration.getReaderName());
}
if (readerConfiguration.isResetIncludeHead()) {
consumerConfiguration.setResetIncludeHead(true);
}
-
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+ if (readerConfiguration.getCryptoFailureAction() != null) {
+
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+ } else if (readerConfiguration.getReaderDecryptFailListener() == null)
{
+
consumerConfiguration.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+ }
if (readerConfiguration.getCryptoKeyReader() != null) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index 629a076c596..ed1169ce4db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
import org.apache.pulsar.client.api.ReaderInterceptor;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
@@ -100,6 +101,16 @@ public class ReaderBuilderImpl<T> implements
ReaderBuilder<T> {
conf.setStartMessageId(MessageId.earliest);
}
+ if (conf.getReaderDecryptFailListener() != null &&
conf.getReaderListener() == null) {
+ return FutureUtil.failedFuture(new IllegalArgumentException(
+ "readerDecryptFailListener must be set with readerListener"
+ ));
+ }
+ if (conf.getCryptoFailureAction() != null &&
conf.getReaderDecryptFailListener() != null) {
+ return FutureUtil.failedFuture(new IllegalArgumentException(
+ "readerDecryptFailListener cannot set with
cryptoFailureAction"
+ ));
+ }
return client.createReaderAsync(conf, schema);
}
@@ -152,6 +163,12 @@ public class ReaderBuilderImpl<T> implements
ReaderBuilder<T> {
return this;
}
+ @Override
+ public ReaderBuilder<T>
readerDecryptFailListener(ReaderDecryptFailListener<T>
readerDecryptFailListener) {
+ conf.setReaderDecryptFailListener(readerDecryptFailListener);
+ return this;
+ }
+
@Override
public ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
conf.setCryptoKeyReader(cryptoKeyReader);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 8760d69447a..779ccadbbd1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -29,12 +29,15 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.DecryptFailListener;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -118,7 +121,29 @@ public class ReaderImpl<T> implements Reader<T> {
});
}
-
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+ if (readerConfiguration.getReaderDecryptFailListener() != null) {
+ ReaderDecryptFailListener<T> readerDecryptFailListener =
readerConfiguration.getReaderDecryptFailListener();
+ consumerConfiguration.setDecryptFailListener(new
DecryptFailListener<>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void received(Consumer<T> consumer, Message<T> msg) {
+ final MessageId messageId = msg.getMessageId();
+ readerDecryptFailListener.received(ReaderImpl.this, msg);
+
consumer.acknowledgeCumulativeAsync(messageId).exceptionally(ex -> {
+ log.error("[{}][{}] auto acknowledge decrypt fail
message {} cumulative fail.", getTopic(),
+ getConsumer().getSubscription(), messageId,
ex);
+ return null;
+ });
+ }
+ });
+ }
+
+ if (readerConfiguration.getCryptoFailureAction() != null) {
+
consumerConfiguration.setCryptoFailureAction(readerConfiguration.getCryptoFailureAction());
+ } else if (readerConfiguration.getReaderDecryptFailListener() == null)
{
+
consumerConfiguration.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+ }
if (readerConfiguration.getCryptoKeyReader() != null) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 01ff4d57782..02851ac36fc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -176,7 +176,14 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
trackMessage(message);
unAckedMessageTracker.add(
MessageIdAdvUtils.discardBatch(message.getMessageId()),
message.getRedeliveryCount());
- listener.received(ZeroQueueConsumerImpl.this,
beforeConsume(message));
+ if (decryptFailListener != null
+ && message.getEncryptionCtx().isPresent()
+ && message.getEncryptionCtx().get().isEncrypted()
+ ) {
+ decryptFailListener.received(ZeroQueueConsumerImpl.this,
beforeConsume(message));
+ } else {
+ listener.received(ZeroQueueConsumerImpl.this,
beforeConsume(message));
+ }
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing
unqueued message: {}", topic, subscription,
message.getMessageId(), t);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 4be8c4ed73e..42fc2666573 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DecryptFailListener;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
@@ -97,6 +98,9 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
@JsonIgnore
private MessageListener<T> messageListener;
+ @JsonIgnore
+ private DecryptFailListener<T> decryptFailListener;
+
@JsonIgnore
private ConsumerEventListener consumerEventListener;
@@ -284,7 +288,7 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
+ "Delivered encrypted message contains {@link
EncryptionContext} which contains encryption and "
+ "compression information in it using which application
can decrypt consumed message payload."
)
- private ConsumerCryptoFailureAction cryptoFailureAction =
ConsumerCryptoFailureAction.FAIL;
+ private ConsumerCryptoFailureAction cryptoFailureAction;
@ApiModelProperty(
name = "properties",
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index cd5aa4c12f5..1fe39e6329c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.ReaderDecryptFailListener;
import org.apache.pulsar.client.api.ReaderInterceptor;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -73,6 +74,12 @@ public class ReaderConfigurationData<T> implements
Serializable, Cloneable {
)
private ReaderListener<T> readerListener;
+ @ApiModelProperty(
+ name = "readerDecryptFailListener",
+ value = "A listener that is called for encrypted message received
and decrypt fail."
+ )
+ private ReaderDecryptFailListener<T> readerDecryptFailListener;
+
@ApiModelProperty(
name = "readerName",
value = "Reader name"
@@ -112,8 +119,10 @@ public class ReaderConfigurationData<T> implements
Serializable, Cloneable {
+ "\n"
+ "Delivered encrypted message contains {@link
EncryptionContext} which contains encryption and "
+ "compression information in it using which application
can decrypt consumed message payload."
+ + "cannot set with {@link ReaderDecryptFailListener}, and
if ReaderDecryptFailListener are set,\n"
+ + "application should responsible for handling decryption
failure."
)
- private ConsumerCryptoFailureAction cryptoFailureAction =
ConsumerCryptoFailureAction.FAIL;
+ private ConsumerCryptoFailureAction cryptoFailureAction;
@JsonIgnore
private transient MessageCrypto messageCrypto = null;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 6a8d8d9e86e..8b031fbd38b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -552,7 +552,7 @@ public class ConsumerBuilderImplTest {
assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(),
TimeUnit.MINUTES.toMillis(1));
- assertEquals(configurationData.getCryptoFailureAction(),
ConsumerCryptoFailureAction.FAIL);
+ assertNull(configurationData.getCryptoFailureAction());
assertThat(configurationData.getProperties()).hasSize(1)
.hasFieldOrPropertyWithValue("prop", "prop-value");
assertFalse(configurationData.isReadCompacted());