jai1 closed pull request #3097: Redeliver messages that can't be decrypted.
URL: https://github.com/apache/pulsar/pull/3097
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a518..fb47c5052d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,6 +35,7 @@
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -2305,6 +2307,137 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+    
+    @Test(groups = "encryption")
+    public void testRedeliveryOfFailedMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+        
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+        
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         * 
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers 
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         * 
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         * 
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         * 
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+        
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+        
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+        
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+        
+        // Consuming from consumer 2 and 3 
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1 
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+        
+        // Consuming from consumer 2 and 3 again just to be sure 
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {
+            assertTrue(messages.contains((message + i)));
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 
     @Test(groups = "encryption")
     public void testEncryptionFailure() throws Exception {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53d8d..6ae925d21c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData 
messageId, MessageMetadata
 
         // If KeyReader is not configured throw exception based on config param
         if (conf.getCryptoKeyReader() == null) {
-
-            if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME) {
-                log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
-                        topic, subscription, consumerName);
-                return payload.retain();
-            } else if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.DISCARD) {
-                log.warn(
-                        "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and config is set to discard",
-                        topic, subscription, consumerName);
-                discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-            } else {
-                log.error(
-                        "[{}][{}][{}] Message delivery failed since 
CryptoKeyReader interface is not implemented to consume encrypted message",
-                        topic, subscription, consumerName);
+            switch (conf.getCryptoFailureAction()) {
+                case CONSUME:
+                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
+                            topic, subscription, consumerName);
+                    return payload.retain();
+                case DISCARD:
+                    log.warn(
+                            "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and config is set to discard",
+                            topic, subscription, consumerName);
+                    discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
+                    return null;
+                case FAIL:
+                    MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+                    log.error(
+                            "[{}][{}][{}][{}] Message delivery failed since 
CryptoKeyReader interface is not implemented to consume encrypted message",
+                             topic, subscription, consumerName, m);
+                    unAckedMessageTracker.add(m);
+                    return null;
             }
-            return null;
         }
 
         ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload, 
conf.getCryptoKeyReader());
         if (decryptedData != null) {
             return decryptedData;
         }
-
-        if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME) {
-            // Note, batch message will fail to consume even if config is set 
to consume
-            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted 
message since config is set to consume.",
-                    topic, subscription, consumerName, messageId);
-            return payload.retain();
-        } else if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.DISCARD) {
-            log.warn("[{}][{}][{}][{}] Discarding message since decryption 
failed and config is set to discard", topic,
-                    subscription, consumerName, messageId);
-            discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-        } else {
-            log.error("[{}][{}][{}][{}] Message delivery failed since unable 
to decrypt incoming message", topic,
-                    subscription, consumerName, messageId);
+        
+        switch (conf.getCryptoFailureAction()) {
+            case CONSUME:
+                // Note, batch message will fail to consume even if config is 
set to consume
+                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming 
encrypted message since config is set to consume.",
+                        topic, subscription, consumerName, messageId);
+                return payload.retain();
+            case DISCARD:
+                log.warn("[{}][{}][{}][{}] Discarding message since decryption 
failed and config is set to discard", topic,
+                        subscription, consumerName, messageId);
+                discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
+                return null;
+            case FAIL:
+                MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+                log.error(
+                        "[{}][{}][{}][{}] Message delivery failed since unable 
to decrypt incoming message",
+                         topic, subscription, consumerName, m);
+                unAckedMessageTracker.add(m);
+                return null;
         }
         return null;
-
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, 
MessageMetadata msgMetadata, ByteBuf payload,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to