This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90f6b9e Fix: NPE in pulsar source close (#3246)
90f6b9e is described below
commit 90f6b9ed550347b1e3bfe07c67833987b27087e7
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Dec 22 19:24:17 2018 -0800
Fix: NPE in pulsar source close (#3246)
---
.../org/apache/pulsar/functions/source/PulsarSource.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 02e56ab..ff41dc8 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -142,12 +142,14 @@ public class PulsarSource<T> extends PushSource<T>
implements MessageListener<T>
@Override
public void close() throws Exception {
- inputConsumers.forEach(consumer -> {
- try {
- consumer.close();
- } catch (PulsarClientException e) {
- }
- });
+ if (inputConsumers != null ) {
+ inputConsumers.forEach(consumer -> {
+ try {
+ consumer.close();
+ } catch (PulsarClientException e) {
+ }
+ });
+ }
}
@SuppressWarnings("unchecked")