sijie closed pull request #1845: Functions schema integration
URL: https://github.com/apache/incubator-pulsar/pull/1845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4be8f58a8e..b392629b36 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -117,8 +118,11 @@
         this.namespaceName = conf.getTopicNames().stream().findFirst()
                 .flatMap(s -> 
Optional.of(TopicName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures = 
conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+        List<CompletableFuture<Void>> futures =
+            conf.getTopicNames().stream()
+                .map(this::subscribeAsync)
                 .collect(Collectors.toList());
+
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
@@ -127,7 +131,7 @@
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can 
start receiving messages now
-                    
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    startReceivingMessages(new 
ArrayList<>(consumers.values()));
                     subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
                         topic, subscription, allTopicPartitionsNumber.get());
diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 43abbe3079..e9fdae85fd 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -37,6 +37,12 @@
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>typetools</artifactId>
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
index f9efa3d8f8..691f3de117 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
@@ -18,10 +18,32 @@
  */
 package org.apache.pulsar.functions.api;
 
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
+
 /**
  * An interface for serializer/deserializer.
  */
-public interface SerDe<T> {
+public interface SerDe<T> extends Schema<T> {
     T deserialize(byte[] input);
+
     byte[] serialize(T input);
+
+    @Override
+    default SchemaInfo getSchemaInfo() {
+        SchemaInfo info = new SchemaInfo();
+        info.setType(SchemaType.NONE);
+        return info;
+    }
+
+    @Override
+    default byte[] encode(T message) {
+        return serialize(message);
+    }
+
+    @Override
+    default T decode(byte[] bytes) {
+        return deserialize(bytes);
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 9eddc690e4..b606402086 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -121,6 +121,15 @@ public ContextImpl(InstanceConfig config, Logger logger, 
PulsarClient client,
         }
     }
 
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
+                       ClassLoader classLoader) {
+        this(config, logger, client, classLoader, null);
+    }
+
+    public void setInputConsumer(Consumer inputConsumer) {
+        this.inputConsumer = inputConsumer;
+    }
+
     public void setCurrentMessageContext(MessageId messageId, String 
topicName) {
         this.messageId = messageId;
         this.currentTopicName = topicName;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5ab8d8548c..ec85261530 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.swing.text.html.Option;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.io.core.Source;
-
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,11 +41,12 @@
  */
 @Slf4j
 public class JavaInstance implements AutoCloseable {
+    private ContextImpl context;
 
     @Getter(AccessLevel.PACKAGE)
-    private final ContextImpl context;
     private Function function;
     private java.util.function.Function javaUtilFunction;
+    private Optional<PulsarSource> optionalPulsarSource = Optional.empty();
 
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
@@ -52,8 +56,8 @@ public JavaInstance(InstanceConfig config, Object 
userClassObject,
         Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
 
         if (source instanceof PulsarSource) {
-            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader,
-                    ((PulsarSource) source).getInputConsumer());
+            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader);
+            optionalPulsarSource = Optional.of((PulsarSource) source);
         } else {
             this.context = null;
         }
@@ -64,13 +68,17 @@ public JavaInstance(InstanceConfig config, Object 
userClassObject,
         } else {
             this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
         }
+
     }
 
     public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
-        if (context != null) {
-            context.setCurrentMessageContext(messageId, topicName);
-        }
+        optionalPulsarSource.ifPresent((pulsarSource) -> {
+            
this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
+            this.context.setCurrentMessageContext(messageId, topicName);
+        });
+
         JavaExecutionResult executionResult = new JavaExecutionResult();
+
         try {
             Object output;
             if (function != null) {
@@ -85,11 +93,15 @@ public JavaExecutionResult handleMessage(MessageId 
messageId, String topicName,
         return executionResult;
     }
 
+    public ContextImpl getContext() {
+        return this.context;
+    }
+
     @Override
     public void close() {
     }
 
-    public InstanceCommunication.MetricsData getAndResetMetrics() {
+    public MetricsData getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 7a561a17d6..73297c7b8d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -27,23 +27,27 @@
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-public abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers<T> implements 
Producers<T> {
 
     protected final PulsarClient client;
     protected final String outputTopic;
+    protected final Schema<T> schema;
 
     AbstractOneOuputTopicProducers(PulsarClient client,
-                                   String outputTopic)
+                                   String outputTopic,
+                                   Schema<T> schema)
             throws PulsarClientException {
         this.client = client;
         this.outputTopic = outputTopic;
+        this.schema = schema;
     }
 
-    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
+    static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, 
Schema<U> schema) {
         // use function result router to deal with different processing 
guarantees.
-        return client.newProducer() //
+        return client.newProducer(schema) //
                 .blockIfQueueFull(true) //
                 .enableBatching(true) //
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
@@ -53,23 +57,23 @@
                 .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer<byte[]> createProducer(String topic)
+    protected Producer<T> createProducer(String topic, Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic);
+        return createProducer(client, topic, schema);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic)
+    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, Schema<T> schema)
             throws PulsarClientException {
-        return newProducerBuilder(client).topic(topic).create();
+        return newProducerBuilder(client, schema).topic(topic).create();
     }
 
-    protected Producer<byte[]> createProducer(String topic, String 
producerName)
+    protected Producer<T> createProducer(String topic, String producerName, 
Schema<T> schema)
             throws PulsarClientException {
-        return createProducer(client, topic, producerName);
+        return createProducer(client, topic, producerName, schema);
     }
 
-    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic, String producerName)
+    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, String producerName, Schema<T> schema)
             throws PulsarClientException {
-        return 
newProducerBuilder(client).topic(topic).producerName(producerName).create();
+        return newProducerBuilder(client, 
schema).topic(topic).producerName(producerName).create();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 12a639e574..48a8b2633f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -31,19 +30,21 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
-public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
+public class MultiConsumersOneOuputTopicProducers<T> extends 
AbstractOneOuputTopicProducers<T> {
 
     @Getter(AccessLevel.PACKAGE)
     // PartitionId -> producer
-    private final Map<String, Producer<byte[]>> producers;
+    private final Map<String, Producer<T>> producers;
 
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic)
+                                                String outputTopic,
+                                                Schema<T> schema)
             throws PulsarClientException {
-        super(client, outputTopic);
+        super(client, outputTopic, schema);
         this.producers = new ConcurrentHashMap<>();
     }
 
@@ -57,10 +58,10 @@ static String makeProducerName(String srcTopicName, String 
srcTopicPartition) {
     }
 
     @Override
-    public synchronized Producer<byte[]> getProducer(String srcPartitionId) 
throws PulsarClientException {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+    public synchronized Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException {
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null == producer) {
-            producer = createProducer(outputTopic, srcPartitionId);
+            producer = createProducer(outputTopic, srcPartitionId, schema);
             producers.put(srcPartitionId, producer);
         }
         return producer;
@@ -68,7 +69,7 @@ static String makeProducerName(String srcTopicName, String 
srcTopicPartition) {
 
     @Override
     public synchronized void closeProducer(String srcPartitionId) {
-        Producer<byte[]> producer = producers.get(srcPartitionId);
+        Producer<T> producer = producers.get(srcPartitionId);
         if (null != producer) {
             producer.closeAsync();
             producers.remove(srcPartitionId);
@@ -78,7 +79,7 @@ public synchronized void closeProducer(String srcPartitionId) 
{
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (Producer<byte[]> producer: producers.values()) {
+        for (Producer<T> producer: producers.values()) {
             closeFutures.add(producer.closeAsync());
         }
         try {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 4d026ee4dd..7892876c1b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -24,7 +24,7 @@
 /**
  * An interface for managing publishers within a java instance.
  */
-public interface Producers extends AutoCloseable {
+public interface Producers<T> extends AutoCloseable {
 
     /**
      * Initialize all the producers.
@@ -40,7 +40,7 @@
      *          src partition Id
      * @return the producer instance to produce messages
      */
-    Producer<byte[]> getProducer(String srcPartitionId) throws 
PulsarClientException;
+    Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcPartitionId</tt>.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 0356ab1d94..60a1589cb5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -54,28 +54,28 @@
 
     private PulsarSinkProcessor pulsarSinkProcessor;
 
-    private interface PulsarSinkProcessor {
+    private interface PulsarSinkProcessor<T> {
         void initializeOutputProducer(String outputTopic) throws Exception;
 
-        void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                RecordContext recordContext) throws Exception;
 
         void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
-        private Producer<byte[]> producer;
+    private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor<T> {
+        private Producer<T> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), outputSerDe);
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            Message<T> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg);
         }
 
@@ -91,19 +91,19 @@ public void close() throws Exception {
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
-        private Producer<byte[]> producer;
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor<T> {
+        private Producer<T> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic());
+                    client, pulsarSinkConfig.getTopic(), outputSerDe);
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            Message<T> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
recordContext.ack());
         }
 
@@ -119,19 +119,19 @@ public void close() throws Exception {
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor<T>, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
-        protected Producers outputProducer;
+        protected Producers<T> outputProducer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
-            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
+            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic, outputSerDe);
             outputProducer.initialize();
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
RecordContext recordContext)
+        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, 
RecordContext recordContext)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
@@ -139,9 +139,9 @@ public void sendOutputMessage(MessageBuilder 
outputMsgBuilder, RecordContext rec
                     .setSequenceId(recordContext.getRecordSequence());
 
             // currently on PulsarRecord
-            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId());
+            Producer<T> producer = 
outputProducer.getProducer(recordContext.getPartitionId());
 
-            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
+            org.apache.pulsar.client.api.Message<T> outputMsg = 
outputMsgBuilder.build();
             producer.sendAsync(outputMsg)
                     .thenAccept(messageId -> recordContext.ack())
                     .join();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 74f23663ef..117f16efab 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -19,40 +19,42 @@
 package org.apache.pulsar.functions.source;
 
 import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
-import org.apache.pulsar.io.core.Source;
 import org.jboss.util.Classes;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 @Slf4j
-public class PulsarSource<T> implements Source<T> {
+public class PulsarSource<T> extends PushSource<T> implements 
MessageListener<T> {
 
     private PulsarClient pulsarClient;
     private PulsarSourceConfig pulsarSourceConfig;
-    private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
     private boolean isTopicsPattern;
+    private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>();
 
-    @Getter
-    private org.apache.pulsar.client.api.Consumer inputConsumer;
+    private Map<String, org.apache.pulsar.client.api.Consumer<T>> 
inputConsumers;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
@@ -64,28 +66,31 @@ public void open(Map<String, Object> config) throws 
Exception {
         // Setup Serialization/Deserialization
         setupSerDe();
 
-        // Setup pulsar consumer
-        ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
+        inputConsumers = Maps.newHashMap();
+        for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) {
+            ConsumerBuilder<T> consumerBuilder = 
this.pulsarClient.newConsumer(entry.getValue())
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
+                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType())
+                .messageListener(this);
 
-        if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());    
-            isTopicsPattern = true;
-        }else {
-            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));    
-        }
-        
-        if (pulsarSourceConfig.getTimeoutMs() != null) {
-            consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            if (pulsarSourceConfig.getTimeoutMs() != null) {
+                consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            }
+
+            if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
+                
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
+                isTopicsPattern = true;
+            }else {
+                consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
+            }
+
+            inputConsumers.put(entry.getKey(),consumerBuilder.subscribe());
         }
-        this.inputConsumer = consumerBuilder.subscribe();
+
     }
 
     @Override
-    public Record<T> read() throws Exception {
-        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
-
+    public void received(Consumer<T> consumer, Message<T> message) {
         String topicName;
         String partitionId;
 
@@ -127,31 +132,44 @@ public void open(Map<String, Object> config) throws 
Exception {
         }
 
         PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
-                .value(input)
-                .messageId(message.getMessageId())
-                .partitionId(String.format("%s-%s", topicName, partitionId))
-                .recordSequence(Utils.getSequenceId(message.getMessageId()))
-                .topicName(topicName)
-                .ackFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                        inputConsumer.acknowledgeCumulativeAsync(message);
-                    } else {
-                        inputConsumer.acknowledgeAsync(message);
-                    }
-                }).failFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                        throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
-                    }
-                })
-                .build();
-        return pulsarMessage;
+            .value(input)
+            .messageId(message.getMessageId())
+            .partitionId(String.format("%s-%s", topicName, partitionId))
+            .recordSequence(Utils.getSequenceId(message.getMessageId()))
+            .topicName(topicName)
+            .ackFunction(() -> {
+                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    consumer.acknowledgeCumulativeAsync(message);
+                } else {
+                    consumer.acknowledgeAsync(message);
+                }
+            }).failFunction(() -> {
+                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    throw new RuntimeException("Failed to process message: " + 
message.getMessageId());
+                }
+            })
+            .build();
+
+        consume(pulsarMessage);
+    }
+
+    public Consumer<T> getConsumerForTopic(String topic) {
+        return inputConsumers.get(topic);
+    }
+
+    @Override
+    public void reachedEndOfTopic(Consumer<T> consumer) {
+        //No-op
     }
 
     @Override
     public void close() throws Exception {
-        if (this.inputConsumer != null) {
-            this.inputConsumer.close();
-        }
+        inputConsumers.forEach((ignored, consumer) -> {
+            try {
+                consumer.close();
+            } catch (PulsarClientException e) {
+            }
+        });
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index b2f2c4f8a9..81cbbda1d2 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import static 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -43,6 +45,7 @@
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -191,10 +194,10 @@
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.newProducer())
+        when(mockClient.newProducer(any(Schema.class)))
             .thenReturn(new MockProducerBuilder());
 
-        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
+        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC, Schema.BYTES);
         producers.initialize();
     }
 
@@ -219,13 +222,13 @@ public void testGetCloseProducer() throws Exception {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer();
+            .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 3c5e61b0de..8ca94bb169 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -34,9 +34,11 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Collections.emptyMap;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -78,9 +80,10 @@ private static PulsarClient getPulsarClient() throws 
PulsarClientException {
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), 
any());
+        
doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject());
         Consumer consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject());
         return pulsarClient;
     }
 
@@ -168,7 +171,7 @@ public void testDefaultSerDe() throws Exception {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
-        pulsarSource.open(new HashMap<>());
+        pulsarSource.open(emptyMap());
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to