This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e422a6f fix: configure crypto-action for internal consumer independently cryptoReader (#2073) e422a6f is described below commit e422a6f93d2e1eb9a3469401966e5c7c4a4a9bda Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jul 3 12:58:00 2018 -0700 fix: configure crypto-action for internal consumer independently cryptoReader (#2073) * fix: configure crypto-action for internal consumer independently cryptoReader * add test-case for multi-topic + cryptoFailureAction --- .../pulsar/client/api/SimpleProducerConsumerTest.java | 13 +++++++------ .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 50d9687..e3ea2a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi; @@ -2430,18 +2431,18 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } } - Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") - .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) - .subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) .cryptoKeyReader(new EncKeyReader()).create(); + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) + .subscribe(); String message = "my-message"; producer.send(message.getBytes()); - MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + TopicMessageImpl<byte[]> msg = (TopicMessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader()); assertEquals(message, receivedMessage); @@ -2450,7 +2451,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } - private String decryptMessage(MessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) + private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception { Optional<EncryptionContext> ctx = msg.getEncryptionCtx(); Assert.assertTrue(ctx.isPresent()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4be8f58..5221b70 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -484,14 +484,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); internalConsumerConfig.setProperties(conf.getProperties()); internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); - + internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); + if (null != conf.getConsumerEventListener()) { internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); } if (conf.getCryptoKeyReader() != null) { internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); - internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); } if (conf.getAckTimeoutMillis() != 0) { internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());