This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e422a6f  fix: configure crypto-action for internal consumer 
independently cryptoReader (#2073)
e422a6f is described below

commit e422a6f93d2e1eb9a3469401966e5c7c4a4a9bda
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Jul 3 12:58:00 2018 -0700

    fix: configure crypto-action for internal consumer independently 
cryptoReader (#2073)
    
    * fix: configure crypto-action for internal consumer independently 
cryptoReader
    
    * add test-case for multi-topic + cryptoFailureAction
---
 .../pulsar/client/api/SimpleProducerConsumerTest.java       | 13 +++++++------
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java  |  4 ++--
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 50d9687..e3ea2a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -2430,18 +2431,18 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             }
         }
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
-                
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
-                .subscribe();
-
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
                 
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
                 .cryptoKeyReader(new EncKeyReader()).create();
+        
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1")
+                
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
 
         String message = "my-message";
         producer.send(message.getBytes());
 
-        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5, 
TimeUnit.SECONDS);
+        TopicMessageImpl<byte[]> msg = (TopicMessageImpl<byte[]>) 
consumer.receive(5, TimeUnit.SECONDS);
 
         String receivedMessage = decryptMessage(msg, encryptionKeyName, new 
EncKeyReader());
         assertEquals(message, receivedMessage);
@@ -2450,7 +2451,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
     
-    private String decryptMessage(MessageImpl<byte[]> msg, String 
encryptionKeyName, CryptoKeyReader reader)
+    private String decryptMessage(TopicMessageImpl<byte[]> msg, String 
encryptionKeyName, CryptoKeyReader reader)
             throws Exception {
         Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
         Assert.assertTrue(ctx.isPresent());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4be8f58..5221b70 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -484,14 +484,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
         internalConsumerConfig.setProperties(conf.getProperties());
         internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
-
+        
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+        
         if (null != conf.getConsumerEventListener()) {
             
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
         }
 
         if (conf.getCryptoKeyReader() != null) {
             
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-            
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
         }
         if (conf.getAckTimeoutMillis() != 0) {
             
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());

Reply via email to