This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c5c3cf  Revert "Functions schema integration (#1845)" (#2018)
4c5c3cf is described below

commit 4c5c3cf2bd00801e0e19f2cf5fa09e2a82a1cf8f
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Thu Jun 21 17:28:40 2018 -0700

    Revert "Functions schema integration (#1845)" (#2018)
    
    This reverts commit 97b56cf04dfca81c937a4b640bc8cfc5470d66cb.
---
 .../client/impl/MultiTopicsConsumerImpl.java       |   8 +-
 pulsar-functions/api-java/pom.xml                  |   6 --
 .../org/apache/pulsar/functions/api/SerDe.java     |  28 +----
 .../pulsar/functions/instance/ContextImpl.java     |   9 --
 .../pulsar/functions/instance/JavaInstance.java    |  32 ++----
 .../producers/AbstractOneOuputTopicProducers.java  |  28 +++--
 .../MultiConsumersOneOuputTopicProducers.java      |  21 ++--
 .../functions/instance/producers/Producers.java    |   4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |  36 +++----
 .../pulsar/functions/source/PulsarSource.java      | 115 +++++++++------------
 .../MultiConsumersOneOutputTopicProducersTest.java |  11 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |   7 +-
 12 files changed, 110 insertions(+), 195 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index b392629..4be8f58 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -118,11 +117,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.namespaceName = conf.getTopicNames().stream().findFirst()
                 .flatMap(s -> 
Optional.of(TopicName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures =
-            conf.getTopicNames().stream()
-                .map(this::subscribeAsync)
+        List<CompletableFuture<Void>> futures = 
conf.getTopicNames().stream().map(t -> subscribeAsync(t))
                 .collect(Collectors.toList());
-
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
@@ -131,7 +127,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can 
start receiving messages now
-                    startReceivingMessages(new 
ArrayList<>(consumers.values()));
+                    
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
                     subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
                         topic, subscription, allTopicPartitionsNumber.get());
diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index d7babec..43abbe3 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -38,12 +38,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.pulsar</groupId>
-      <artifactId>pulsar-client-original</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>typetools</artifactId>
       <scope>test</scope>
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
index 2caee16..f9efa3d 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
@@ -18,36 +18,10 @@
  */
 package org.apache.pulsar.functions.api;
 
-import java.util.Collections;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.schema.SchemaType;
-
 /**
  * An interface for serializer/deserializer.
  */
-public interface SerDe<T> extends Schema<T> {
+public interface SerDe<T> {
     T deserialize(byte[] input);
-
     byte[] serialize(T input);
-
-    @Override
-    default SchemaInfo getSchemaInfo() {
-        SchemaInfo info = new SchemaInfo();
-        info.setName("");
-        info.setType(SchemaType.NONE);
-        info.setSchema(new byte[0]);
-        info.setProperties(Collections.emptyMap());
-        return info;
-    }
-
-    @Override
-    default byte[] encode(T message) {
-        return serialize(message);
-    }
-
-    @Override
-    default T decode(byte[] bytes) {
-        return deserialize(bytes);
-    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index fce440d..5ca07d9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -124,15 +124,6 @@ class ContextImpl implements Context {
         }
     }
 
-    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
-                       ClassLoader classLoader) {
-        this(config, logger, client, classLoader, null);
-    }
-
-    public void setInputConsumer(Consumer inputConsumer) {
-        this.inputConsumer = inputConsumer;
-    }
-
     public void setCurrentMessageContext(MessageId messageId, String 
topicName) {
         this.messageId = messageId;
         this.currentTopicName = topicName;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index ec85261..5ab8d85 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,19 +18,16 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import javax.swing.text.html.Option;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
-import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.io.core.Source;
+
+import org.apache.pulsar.functions.source.PulsarSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,12 +38,11 @@ import org.slf4j.LoggerFactory;
  */
 @Slf4j
 public class JavaInstance implements AutoCloseable {
-    private ContextImpl context;
 
     @Getter(AccessLevel.PACKAGE)
+    private final ContextImpl context;
     private Function function;
     private java.util.function.Function javaUtilFunction;
-    private Optional<PulsarSource> optionalPulsarSource = Optional.empty();
 
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
@@ -56,8 +52,8 @@ public class JavaInstance implements AutoCloseable {
         Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
 
         if (source instanceof PulsarSource) {
-            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader);
-            optionalPulsarSource = Optional.of((PulsarSource) source);
+            this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader,
+                    ((PulsarSource) source).getInputConsumer());
         } else {
             this.context = null;
         }
@@ -68,17 +64,13 @@ public class JavaInstance implements AutoCloseable {
         } else {
             this.javaUtilFunction = (java.util.function.Function) 
userClassObject;
         }
-
     }
 
     public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
-        optionalPulsarSource.ifPresent((pulsarSource) -> {
-            
this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
-            this.context.setCurrentMessageContext(messageId, topicName);
-        });
-
+        if (context != null) {
+            context.setCurrentMessageContext(messageId, topicName);
+        }
         JavaExecutionResult executionResult = new JavaExecutionResult();
-
         try {
             Object output;
             if (function != null) {
@@ -93,15 +85,11 @@ public class JavaInstance implements AutoCloseable {
         return executionResult;
     }
 
-    public ContextImpl getContext() {
-        return this.context;
-    }
-
     @Override
     public void close() {
     }
 
-    public MetricsData getAndResetMetrics() {
+    public InstanceCommunication.MetricsData getAndResetMetrics() {
         return context.getAndResetMetrics();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 73297c7..7a561a1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -27,27 +27,23 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-public abstract class AbstractOneOuputTopicProducers<T> implements 
Producers<T> {
+public abstract class AbstractOneOuputTopicProducers implements Producers {
 
     protected final PulsarClient client;
     protected final String outputTopic;
-    protected final Schema<T> schema;
 
     AbstractOneOuputTopicProducers(PulsarClient client,
-                                   String outputTopic,
-                                   Schema<T> schema)
+                                   String outputTopic)
             throws PulsarClientException {
         this.client = client;
         this.outputTopic = outputTopic;
-        this.schema = schema;
     }
 
-    static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, 
Schema<U> schema) {
+    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
         // use function result router to deal with different processing 
guarantees.
-        return client.newProducer(schema) //
+        return client.newProducer() //
                 .blockIfQueueFull(true) //
                 .enableBatching(true) //
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
@@ -57,23 +53,23 @@ public abstract class AbstractOneOuputTopicProducers<T> 
implements Producers<T>
                 .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer<T> createProducer(String topic, Schema<T> schema)
+    protected Producer<byte[]> createProducer(String topic)
             throws PulsarClientException {
-        return createProducer(client, topic, schema);
+        return createProducer(client, topic);
     }
 
-    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, Schema<T> schema)
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic)
             throws PulsarClientException {
-        return newProducerBuilder(client, schema).topic(topic).create();
+        return newProducerBuilder(client).topic(topic).create();
     }
 
-    protected Producer<T> createProducer(String topic, String producerName, 
Schema<T> schema)
+    protected Producer<byte[]> createProducer(String topic, String 
producerName)
             throws PulsarClientException {
-        return createProducer(client, topic, producerName, schema);
+        return createProducer(client, topic, producerName);
     }
 
-    public static <T> Producer<T> createProducer(PulsarClient client, String 
topic, String producerName, Schema<T> schema)
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic, String producerName)
             throws PulsarClientException {
-        return newProducerBuilder(client, 
schema).topic(topic).producerName(producerName).create();
+        return 
newProducerBuilder(client).topic(topic).producerName(producerName).create();
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 48a8b26..12a639e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -30,21 +31,19 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
 
 @Slf4j
-public class MultiConsumersOneOuputTopicProducers<T> extends 
AbstractOneOuputTopicProducers<T> {
+public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
     // PartitionId -> producer
-    private final Map<String, Producer<T>> producers;
+    private final Map<String, Producer<byte[]>> producers;
 
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic,
-                                                Schema<T> schema)
+                                                String outputTopic)
             throws PulsarClientException {
-        super(client, outputTopic, schema);
+        super(client, outputTopic);
         this.producers = new ConcurrentHashMap<>();
     }
 
@@ -58,10 +57,10 @@ public class MultiConsumersOneOuputTopicProducers<T> 
extends AbstractOneOuputTop
     }
 
     @Override
-    public synchronized Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException {
-        Producer<T> producer = producers.get(srcPartitionId);
+    public synchronized Producer<byte[]> getProducer(String srcPartitionId) 
throws PulsarClientException {
+        Producer<byte[]> producer = producers.get(srcPartitionId);
         if (null == producer) {
-            producer = createProducer(outputTopic, srcPartitionId, schema);
+            producer = createProducer(outputTopic, srcPartitionId);
             producers.put(srcPartitionId, producer);
         }
         return producer;
@@ -69,7 +68,7 @@ public class MultiConsumersOneOuputTopicProducers<T> extends 
AbstractOneOuputTop
 
     @Override
     public synchronized void closeProducer(String srcPartitionId) {
-        Producer<T> producer = producers.get(srcPartitionId);
+        Producer<byte[]> producer = producers.get(srcPartitionId);
         if (null != producer) {
             producer.closeAsync();
             producers.remove(srcPartitionId);
@@ -79,7 +78,7 @@ public class MultiConsumersOneOuputTopicProducers<T> extends 
AbstractOneOuputTop
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (Producer<T> producer: producers.values()) {
+        for (Producer<byte[]> producer: producers.values()) {
             closeFutures.add(producer.closeAsync());
         }
         try {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 7892876..4d026ee 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * An interface for managing publishers within a java instance.
  */
-public interface Producers<T> extends AutoCloseable {
+public interface Producers extends AutoCloseable {
 
     /**
      * Initialize all the producers.
@@ -40,7 +40,7 @@ public interface Producers<T> extends AutoCloseable {
      *          src partition Id
      * @return the producer instance to produce messages
      */
-    Producer<T> getProducer(String srcPartitionId) throws 
PulsarClientException;
+    Producer<byte[]> getProducer(String srcPartitionId) throws 
PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcPartitionId</tt>.
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 60a1589..0356ab1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -54,28 +54,28 @@ public class PulsarSink<T> implements Sink<T> {
 
     private PulsarSinkProcessor pulsarSinkProcessor;
 
-    private interface PulsarSinkProcessor<T> {
+    private interface PulsarSinkProcessor {
         void initializeOutputProducer(String outputTopic) throws Exception;
 
-        void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
+        void sendOutputMessage(MessageBuilder outputMsgBuilder,
                                RecordContext recordContext) throws Exception;
 
         void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor<T> {
-        private Producer<T> producer;
+    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+        private Producer<byte[]> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), outputSerDe);
+                    client, pulsarSinkConfig.getTopic());
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<T> outputMsg = outputMsgBuilder.build();
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg);
         }
 
@@ -91,19 +91,19 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor<T> {
-        private Producer<T> producer;
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
+        private Producer<byte[]> producer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
             this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), outputSerDe);
+                    client, pulsarSinkConfig.getTopic());
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
                                       RecordContext recordContext) throws 
Exception {
-            Message<T> outputMsg = outputMsgBuilder.build();
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
recordContext.ack());
         }
 
@@ -119,19 +119,19 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor<T>, ConsumerEventListener {
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
-        protected Producers<T> outputProducer;
+        protected Producers outputProducer;
 
         @Override
         public void initializeOutputProducer(String outputTopic) throws 
Exception {
-            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic, outputSerDe);
+            outputProducer = new MultiConsumersOneOuputTopicProducers(client, 
outputTopic);
             outputProducer.initialize();
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, 
RecordContext recordContext)
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
RecordContext recordContext)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
@@ -139,9 +139,9 @@ public class PulsarSink<T> implements Sink<T> {
                     .setSequenceId(recordContext.getRecordSequence());
 
             // currently on PulsarRecord
-            Producer<T> producer = 
outputProducer.getProducer(recordContext.getPartitionId());
+            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId());
 
-            org.apache.pulsar.client.api.Message<T> outputMsg = 
outputMsgBuilder.build();
+            org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
             producer.sendAsync(outputMsg)
                     .thenAccept(messageId -> recordContext.ack())
                     .join();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index c1dddb0..74f2366 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -19,41 +19,40 @@
 package org.apache.pulsar.functions.source;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
-import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Source;
 import org.jboss.util.Classes;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 @Slf4j
-public class PulsarSource<T> extends PushSource<T> implements 
MessageListener<T> {
+public class PulsarSource<T> implements Source<T> {
 
     private PulsarClient pulsarClient;
     private PulsarSourceConfig pulsarSourceConfig;
+    private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
     private boolean isTopicsPattern;
-    private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>();
 
-    private Map<String, org.apache.pulsar.client.api.Consumer<T>> 
inputConsumers;
+    @Getter
+    private org.apache.pulsar.client.api.Consumer inputConsumer;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
@@ -65,31 +64,28 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
         // Setup Serialization/Deserialization
         setupSerDe();
 
-        inputConsumers = Maps.newHashMap();
-        for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) {
-            ConsumerBuilder<T> consumerBuilder = 
this.pulsarClient.newConsumer(entry.getValue())
+        // Setup pulsar consumer
+        ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType())
-                .messageListener(this);
-
-            if (pulsarSourceConfig.getTimeoutMs() != null) {
-                consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
-            }
-
-            if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-                
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
-                isTopicsPattern = true;
-            }else {
-                consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
-            }
+                
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
 
-            inputConsumers.put(entry.getKey(),consumerBuilder.subscribe());
+        if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
+            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());    
+            isTopicsPattern = true;
+        }else {
+            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));    
         }
-
+        
+        if (pulsarSourceConfig.getTimeoutMs() != null) {
+            consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        }
+        this.inputConsumer = consumerBuilder.subscribe();
     }
 
     @Override
-    public void received(Consumer<T> consumer, Message<T> message) {
+    public Record<T> read() throws Exception {
+        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
+
         String topicName;
         String partitionId;
 
@@ -131,44 +127,31 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
         }
 
         PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
-            .value(input)
-            .messageId(message.getMessageId())
-            .partitionId(String.format("%s-%s", topicName, partitionId))
-            .recordSequence(Utils.getSequenceId(message.getMessageId()))
-            .topicName(topicName)
-            .ackFunction(() -> {
-                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                    consumer.acknowledgeCumulativeAsync(message);
-                } else {
-                    consumer.acknowledgeAsync(message);
-                }
-            }).failFunction(() -> {
-                if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                    throw new RuntimeException("Failed to process message: " + 
message.getMessageId());
-                }
-            })
-            .build();
-
-        consume(pulsarMessage);
-    }
-
-    public Consumer<T> getConsumerForTopic(String topic) {
-        return inputConsumers.get(topic);
-    }
-
-    @Override
-    public void reachedEndOfTopic(Consumer<T> consumer) {
-        //No-op
+                .value(input)
+                .messageId(message.getMessageId())
+                .partitionId(String.format("%s-%s", topicName, partitionId))
+                .recordSequence(Utils.getSequenceId(message.getMessageId()))
+                .topicName(topicName)
+                .ackFunction(() -> {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                        inputConsumer.acknowledgeCumulativeAsync(message);
+                    } else {
+                        inputConsumer.acknowledgeAsync(message);
+                    }
+                }).failFunction(() -> {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                        throw new RuntimeException("Failed to process message: 
" + message.getMessageId());
+                    }
+                })
+                .build();
+        return pulsarMessage;
     }
 
     @Override
     public void close() throws Exception {
-        inputConsumers.forEach((ignored, consumer) -> {
-            try {
-                consumer.close();
-            } catch (PulsarClientException e) {
-            }
-        });
+        if (this.inputConsumer != null) {
+            this.inputConsumer.close();
+        }
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index def0926..ce67442 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,8 +19,6 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import static 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -45,7 +43,6 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -199,10 +196,10 @@ public class MultiConsumersOneOutputTopicProducersTest {
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.newProducer(any(Schema.class)))
+        when(mockClient.newProducer())
             .thenReturn(new MockProducerBuilder());
 
-        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC, Schema.BYTES);
+        producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
         producers.initialize();
     }
 
@@ -227,13 +224,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 8ca94bb..3c5e61b 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -34,11 +34,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static java.util.Collections.emptyMap;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -80,10 +78,9 @@ public class PulsarSourceTest {
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
         
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
         doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), 
any());
-        
doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject());
         Consumer consumer = mock(Consumer.class);
         doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject());
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
         return pulsarClient;
     }
 
@@ -171,7 +168,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
-        pulsarSource.open(emptyMap());
+        pulsarSource.open(new HashMap<>());
     }
 
     /**

Reply via email to