This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ab5834b Fixed the null checking of retryDetails field (#2616)
ab5834b is described below
commit ab5834b3f4580f601abbd1fa37ce11dfb0683b3f
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Sep 19 19:09:57 2018 -0700
Fixed the null checking of retryDetails field (#2616)
---
.../java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java | 2 +-
.../src/main/java/org/apache/pulsar/functions/source/PulsarSource.java | 2 +-
.../java/org/apache/pulsar/functions/source/PulsarSourceConfig.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 1e07516..32e878d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -548,7 +548,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
}
- if (this.instanceConfig.getFunctionDetails().getRetryDetails() !=
null) {
+ if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index afac782..0e840a2 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -90,7 +90,7 @@ public class PulsarSource<T> extends PushSource<T> implements
MessageListener<T>
cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
}
- if (pulsarSourceConfig.getMaxMessageRetries() >= 0) {
+ if (pulsarSourceConfig.getMaxMessageRetries() != null &&
pulsarSourceConfig.getMaxMessageRetries() >= 0) {
DeadLetterPolicy.DeadLetterPolicyBuilder
deadLetterPolicyBuilder = DeadLetterPolicy.builder();
deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
if (pulsarSourceConfig.getDeadLetterTopic() != null &&
!pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 4e2afa7..65c5847 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,7 +37,7 @@ public class PulsarSourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
SubscriptionType subscriptionType;
private String subscriptionName;
- private int maxMessageRetries;
+ private Integer maxMessageRetries = -1;
private String deadLetterTopic;
private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();