jai1 closed pull request #3097: Redeliver messages that can't be decrypted.
URL: https://github.com/apache/pulsar/pull/3097
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a518..fb47c5052d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -34,6 +35,7 @@
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -2305,6 +2307,137 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(groups = "encryption")
+ public void testRedeliveryOfFailedMessages() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ class InvalidKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> metadata) {
+ return null;
+ }
+ }
+
+ /*
+ * Redelivery functionality guarantees that customer will get a chance
to process the message again.
+ * In case of shared subscription eventually every client will get a
chance to process the message, till one of them acks it.
+ *
+ * For client with Encryption enabled where in cases like a new
production rollout or a buggy client configuration, we might have a mismatch of
consumers
+ * - few which can decrypt, few which can't (due to errors or
cryptoReader not configured).
+ *
+ * In that case eventually all messages should be acked as long as
there is a single consumer who can decrypt the message.
+ *
+ * Consumer 1 - Can decrypt message
+ * Consumer 2 - Has invalid Reader configured.
+ * Consumer 3 - Has no reader configured.
+ *
+ */
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
EncKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
InvalidKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topicsPattern(topicName)
+
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ int numberOfMessages = 100;
+ String message = "my-message";
+ Set<String> messages = new HashSet(); // Since messages are in random
order
+ for (int i = 0; i<numberOfMessages; i++) {
+ producer.send((message + i).getBytes());
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ Message m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ for (int i = 0; i<numberOfMessages; i++) {
+ // All messages would be received by consumer 1
+ m = consumer1.receive();
+ messages.add(new String(m.getData()));
+ consumer1.acknowledge(m);
+ }
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ // checking if all messages were received
+ for (int i = 0; i<numberOfMessages; i++) {
+ assertTrue(messages.contains((message + i)));
+ }
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
@Test(groups = "encryption")
public void testEncryptionFailure() throws Exception {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53d8d..6ae925d21c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData
messageId, MessageMetadata
// If KeyReader is not configured throw exception based on config param
if (conf.getCryptoKeyReader() == null) {
-
- if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME) {
- log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
- topic, subscription, consumerName);
- return payload.retain();
- } else if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.DISCARD) {
- log.warn(
- "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and config is set to discard",
- topic, subscription, consumerName);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- } else {
- log.error(
- "[{}][{}][{}] Message delivery failed since
CryptoKeyReader interface is not implemented to consume encrypted message",
- topic, subscription, consumerName);
+ switch (conf.getCryptoFailureAction()) {
+ case CONSUME:
+ log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
+ topic, subscription, consumerName);
+ return payload.retain();
+ case DISCARD:
+ log.warn(
+ "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and config is set to discard",
+ topic, subscription, consumerName);
+ discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
+ return null;
+ case FAIL:
+ MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ log.error(
+ "[{}][{}][{}][{}] Message delivery failed since
CryptoKeyReader interface is not implemented to consume encrypted message",
+ topic, subscription, consumerName, m);
+ unAckedMessageTracker.add(m);
+ return null;
}
- return null;
}
ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload,
conf.getCryptoKeyReader());
if (decryptedData != null) {
return decryptedData;
}
-
- if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME) {
- // Note, batch message will fail to consume even if config is set
to consume
- log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted
message since config is set to consume.",
- topic, subscription, consumerName, messageId);
- return payload.retain();
- } else if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.DISCARD) {
- log.warn("[{}][{}][{}][{}] Discarding message since decryption
failed and config is set to discard", topic,
- subscription, consumerName, messageId);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- } else {
- log.error("[{}][{}][{}][{}] Message delivery failed since unable
to decrypt incoming message", topic,
- subscription, consumerName, messageId);
+
+ switch (conf.getCryptoFailureAction()) {
+ case CONSUME:
+ // Note, batch message will fail to consume even if config is
set to consume
+ log.warn("[{}][{}][{}][{}] Decryption failed. Consuming
encrypted message since config is set to consume.",
+ topic, subscription, consumerName, messageId);
+ return payload.retain();
+ case DISCARD:
+ log.warn("[{}][{}][{}][{}] Discarding message since decryption
failed and config is set to discard", topic,
+ subscription, consumerName, messageId);
+ discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
+ return null;
+ case FAIL:
+ MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ log.error(
+ "[{}][{}][{}][{}] Message delivery failed since unable
to decrypt incoming message",
+ topic, subscription, consumerName, m);
+ unAckedMessageTracker.add(m);
+ return null;
}
return null;
-
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId,
MessageMetadata msgMetadata, ByteBuf payload,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services