This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8e95ddc By default PulsarSource consume encrypted message (#2074) 8e95ddc is described below commit 8e95ddc5f3103279a39bd98031b3d16fb17dbdb3 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Jul 3 11:00:40 2018 -0700 By default PulsarSource consume encrypted message (#2074) * By default PulsarSource consume encrypted message * fix: mock test --- .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 2 ++ .../test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java | 1 + 2 files changed, 3 insertions(+) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e5100c8..54373ba 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -67,6 +67,8 @@ public class PulsarSource<T> implements Source<T> { // Setup pulsar consumer ConsumerBuilder<byte[]> consumerBuilder = this.pulsarClient.newConsumer() + //consume message even if can't decrypt and deliver it along with encryption-ctx + .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 3c5e61b..a7e3610 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -75,6 +75,7 @@ public class PulsarSourceTest { PulsarClient pulsarClient = mock(PulsarClient.class); ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); doReturn(consumerBuilder).when(consumerBuilder).topics(anyList()); + doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any()); doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());