This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5766844 fix bug in concerning ContextImpl (#2052)
5766844 is described below
commit 57668440104fb56ce932287190f5203e2073bd62
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Mon Jul 2 00:18:28 2018 -0700
fix bug in concerning ContextImpl (#2052)
NPE thrown when submitting a source because context is null
---
.../apache/pulsar/functions/instance/ContextImpl.java | 16 ++++++++++++----
.../apache/pulsar/functions/instance/JavaInstance.java | 9 +++++----
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5ca07d9..e2887bf 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,11 +18,8 @@
*/
package org.apache.pulsar.functions.instance;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import java.nio.ByteBuffer;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
@@ -36,15 +33,17 @@ import
org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.utils.Reflections;
import org.slf4j.Logger;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkState;
+
/**
* This class implements the Context interface exposed to the user.
*/
@@ -141,6 +142,9 @@ class ContextImpl implements Context {
@Override
public Collection<String> getInputTopics() {
+ if (inputConsumer == null) {
+ return new LinkedList<>();
+ }
if (inputConsumer instanceof MultiTopicsConsumerImpl) {
return ((MultiTopicsConsumerImpl) inputConsumer).getTopics();
} else {
@@ -302,6 +306,10 @@ class ContextImpl implements Context {
//TODO remove topic argument
@Override
public CompletableFuture<Void> ack(byte[] messageId) {
+ // if inputConsumer is null, then ack is a no-op
+ if (inputConsumer == null) {
+ return CompletableFuture.completedFuture(null);
+ }
MessageId actualMessageId = null;
try {
actualMessageId = MessageId.fromByteArray(messageId);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5ab8d85..3bd563e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.instance;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Function;
@@ -51,13 +52,13 @@ public class JavaInstance implements AutoCloseable {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" +
config.getFunctionDetails().getName());
+ Consumer consumer = null;
if (source instanceof PulsarSource) {
- this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader,
- ((PulsarSource) source).getInputConsumer());
- } else {
- this.context = null;
+ consumer = ((PulsarSource) source).getInputConsumer();
}
+ this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader, consumer);
+
// create the functions
if (userClassObject instanceof Function) {
this.function = (Function) userClassObject;