saandrews commented on a change in pull request #3097: Redeliver messages that
can't be decrypted.
URL: https://github.com/apache/pulsar/pull/3097#discussion_r237754599
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -2305,6 +2307,128 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(groups = "encryption")
+ public void testRedeliveryOfFailedMessages() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ class InvalidKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> metadata) {
+ return null;
+ }
+ }
+
+ /*
+ * Redelivery functionality guarantees that customer will get a chance
to process the message again.
+ * In case of shared subscription eventually every client will get a
chance to process the message, till one of them acks it.
+ *
+ * For client with Encryption enabled where in cases like a new
production rollout or a buggy client configuration, we might have a mismatch of
consumers
+ * - few which can decrypt, few which can't (due to errors or
cryptoReader not configured).
+ *
+ * In that case eventually all messages should be acked as long as
there is a single consumer who can decrypt the message.
+ *
+ * Consumer 1 - Can decrypt message
+ * Consumer 2 - Has invalid Reader configured.
+ * Consumer 3 - Has no reader configured.
+ *
+ */
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
EncKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
InvalidKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topicsPattern(topicName)
+
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ int numberOfMessages = 100;
+ String message = "my-message";
+ Set<String> messages = new HashSet(); // Since messages are in random
order
+ for (int i = 0; i<numberOfMessages; i++) {
+ producer.send((message + i).getBytes());
+ }
+
+ for (int i = 0; i<numberOfMessages; i++) {
+ // All messages would be received by consumer 2
Review comment:
consumer 1 instead?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services