This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e422a6f fix: configure crypto-action for internal consumer
independently cryptoReader (#2073)
e422a6f is described below
commit e422a6f93d2e1eb9a3469401966e5c7c4a4a9bda
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Jul 3 12:58:00 2018 -0700
fix: configure crypto-action for internal consumer independently
cryptoReader (#2073)
* fix: configure crypto-action for internal consumer independently
cryptoReader
* add test-case for multi-topic + cryptoFailureAction
---
.../pulsar/client/api/SimpleProducerConsumerTest.java | 13 +++++++------
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++--
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 50d9687..e3ea2a6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -60,6 +60,7 @@ import
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -2430,18 +2431,18 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
}
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
-
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
- .subscribe();
-
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
.cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1")
+
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
String message = "my-message";
producer.send(message.getBytes());
- MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5,
TimeUnit.SECONDS);
+ TopicMessageImpl<byte[]> msg = (TopicMessageImpl<byte[]>)
consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = decryptMessage(msg, encryptionKeyName, new
EncKeyReader());
assertEquals(message, receivedMessage);
@@ -2450,7 +2451,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- private String decryptMessage(MessageImpl<byte[]> msg, String
encryptionKeyName, CryptoKeyReader reader)
+ private String decryptMessage(TopicMessageImpl<byte[]> msg, String
encryptionKeyName, CryptoKeyReader reader)
throws Exception {
Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
Assert.assertTrue(ctx.isPresent());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4be8f58..5221b70 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -484,14 +484,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
internalConsumerConfig.setProperties(conf.getProperties());
internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
-
+
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+
if (null != conf.getConsumerEventListener()) {
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
}
if (conf.getCryptoKeyReader() != null) {
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
}
if (conf.getAckTimeoutMillis() != 0) {
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());