This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5834b  Fixed the null checking of retryDetails field (#2616)
ab5834b is described below

commit ab5834b3f4580f601abbd1fa37ce11dfb0683b3f
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Sep 19 19:09:57 2018 -0700

    Fixed the null checking of retryDetails field (#2616)
---
 .../java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java | 2 +-
 .../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java  | 2 +-
 .../java/org/apache/pulsar/functions/source/PulsarSourceConfig.java     | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 1e07516..32e878d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -548,7 +548,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
                 pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
             }
 
-            if (this.instanceConfig.getFunctionDetails().getRetryDetails() != 
null) {
+            if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                 
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                 
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
             }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index afac782..0e840a2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -90,7 +90,7 @@ public class PulsarSource<T> extends PushSource<T> implements 
MessageListener<T>
                 cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
             }
 
-            if (pulsarSourceConfig.getMaxMessageRetries() >= 0) {
+            if (pulsarSourceConfig.getMaxMessageRetries() != null && 
pulsarSourceConfig.getMaxMessageRetries() >= 0) {
                 DeadLetterPolicy.DeadLetterPolicyBuilder 
deadLetterPolicyBuilder = DeadLetterPolicy.builder();
                 
deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
                 if (pulsarSourceConfig.getDeadLetterTopic() != null && 
!pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 4e2afa7..65c5847 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,7 +37,7 @@ public class PulsarSourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
-    private int maxMessageRetries;
+    private Integer maxMessageRetries = -1;
     private String deadLetterTopic;
 
     private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();

Reply via email to