saandrews commented on a change in pull request #3097: Redeliver messages that 
can't be decrypted.
URL: https://github.com/apache/pulsar/pull/3097#discussion_r237754716
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##########
 @@ -2305,6 +2307,128 @@ public EncryptionKeyInfo getPrivateKey(String keyName, 
Map<String, String> keyMe
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+    
+    @Test(groups = "encryption")
+    public void testRedeliveryOfFailedMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+        
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+        
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         * 
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers 
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         * 
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         * 
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         * 
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+        
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+        
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+        
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+        
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 2
+            Message m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+        
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
 
 Review comment:
   Shouldn't we call receive on consumer 2 and 3 first?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to