This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8e95ddc By default PulsarSource consume encrypted message (#2074)
8e95ddc is described below
commit 8e95ddc5f3103279a39bd98031b3d16fb17dbdb3
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Jul 3 11:00:40 2018 -0700
By default PulsarSource consume encrypted message (#2074)
* By default PulsarSource consume encrypted message
* fix: mock test
---
.../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 2 ++
.../test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java | 1 +
2 files changed, 3 insertions(+)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e5100c8..54373ba 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -67,6 +67,8 @@ public class PulsarSource<T> implements Source<T> {
// Setup pulsar consumer
ConsumerBuilder<byte[]> consumerBuilder =
this.pulsarClient.newConsumer()
+ //consume message even if can't decrypt and deliver it along
with encryption-ctx
+ .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 3c5e61b..a7e3610 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -75,6 +75,7 @@ public class PulsarSourceTest {
PulsarClient pulsarClient = mock(PulsarClient.class);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
+
doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(),
any());