sijie closed pull request #1580: Modified functions producer consumers to use 
new builder based api
URL: https://github.com/apache/incubator-pulsar/pull/1580
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 073614623..c16d6767c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -21,6 +21,10 @@
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.nio.file.Paths;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +35,7 @@
 import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -40,12 +44,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import lombok.Cleanup;
-
 public class CompactorTool {
 
     private static class Arguments {
@@ -82,15 +80,16 @@ public static void main(String[] args) throws Exception {
         }
 
         String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
-        ClientConfiguration clientConfig = new ClientConfiguration();
+        ClientBuilder clientBuilder = PulsarClient.builder();
 
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-            
clientConfig.setAuthentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
-                                           
brokerConfig.getBrokerClientAuthenticationParameters());
+            
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+                    brokerConfig.getBrokerClientAuthenticationParameters());
         }
-        clientConfig.setUseTls(brokerConfig.isTlsEnabled());
-        
clientConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
-        
clientConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+        clientBuilder.serviceUrl(pulsarServiceUrl)
+                .enableTls(brokerConfig.isTlsEnabled())
+                
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
+                
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
 
         ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -103,7 +102,7 @@ public static void main(String[] args) throws Exception {
                                               
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
         BookKeeperClientFactory bkClientFactory = new 
BookKeeperClientFactoryImpl();
         BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
-        try (PulsarClient pulsar = PulsarClient.create(pulsarServiceUrl, 
clientConfig)) {
+        try (PulsarClient pulsar = clientBuilder.build()) {
             Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, 
bk, scheduler);
             long ledgerId = compactor.compact(arguments.topic).get();
             log.info("Compaction of topic {} complete. Compacted to ledger 
{}", arguments.topic, ledgerId);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3213fd961..5db4f2e57 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,36 +21,33 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.collect.ImmutableMap;
+
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.RawBatchConverter;
-
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 32cb5c098..e3041ba05 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -93,10 +93,11 @@ public void updatePeerClusterNames(String cluster, 
LinkedHashSet<String> peerClu
         } catch (Exception e) {
             throw getApiException(e);
         }
-        
+
     }
 
        @Override
+    @SuppressWarnings("unchecked")
        public Set<String> getPeerClusterNames(String cluster) throws 
PulsarAdminException {
                try {
                        return 
request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
@@ -104,7 +105,7 @@ public void updatePeerClusterNames(String cluster, 
LinkedHashSet<String> peerClu
                        throw getApiException(e);
                }
        }
-    
+
     @Override
     public void deleteCluster(String cluster) throws PulsarAdminException {
         try {
@@ -125,7 +126,7 @@ public void deleteCluster(String cluster) throws 
PulsarAdminException {
         }
     }
 
-    
+
     @Override
     public List<BrokerNamespaceIsolationData> 
getBrokersWithNamespaceIsolationPolicy(String cluster)
             throws PulsarAdminException {
@@ -223,7 +224,7 @@ public FailureDomain getFailureDomain(String cluster, 
String domainName) throws
             throw getApiException(e);
         }
     }
-    
+
     private void setDomain(String cluster, String domainName,
             FailureDomain domain) throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index 3121739fb..86f2c05b9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -22,6 +22,7 @@
 
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -796,7 +797,7 @@ private TopicName validateTopic(String topic) {
                 }
             }
 
-            return Lists.newArrayList(new MessageImpl<byte[]>(msgId, 
properties, data, Schema.IDENTITY));
+            return Collections.singletonList(new MessageImpl<byte[]>(msgId, 
properties, data, Schema.IDENTITY));
         } finally {
             if (stream != null) {
                 stream.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f024a4f7a..af11ef594 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -236,4 +236,8 @@ private ConsumerBuilderImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T
         conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
                return this;
        }
+
+       public ConsumerConfigurationData<T> getConf() {
+           return conf;
+       }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 819c118cc..86aeb83ef 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -38,7 +38,7 @@
 public class AtLeastOnceProcessor extends MessageProcessorBase {
 
     @Getter
-    private Producer producer;
+    private Producer<byte[]> producer;
 
     AtLeastOnceProcessor(PulsarClient client,
                          FunctionDetails functionDetails,
@@ -52,13 +52,13 @@ protected void initializeOutputProducer(String outputTopic) 
throws Exception {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
         if (null == outputMsgBuilder || null == producer) {
             inputMsg.ack();
             return;
         }
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg)
             .thenAccept(msgId -> inputMsg.ack());
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index b6871fa8d..994be0d03 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -36,7 +36,7 @@
 @Slf4j
 class AtMostOnceProcessor extends MessageProcessorBase {
 
-    private Producer producer;
+    private Producer<byte[]> producer;
 
     AtMostOnceProcessor(PulsarClient client,
                         FunctionDetails functionDetails,
@@ -58,12 +58,12 @@ protected void initializeOutputProducer(String outputTopic) 
throws Exception {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
         if (null == outputMsgBuilder) {
             return;
         }
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg);
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index d31bb0619..59c7bd6f3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -22,11 +22,13 @@
 import java.util.List;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -102,7 +104,7 @@ protected void initializeOutputProducer(String outputTopic) 
throws Exception {
 
     @Override
     public void sendOutputMessage(InputMessage inputMsg,
-                                  MessageBuilder outputMsgBuilder) throws 
Exception {
+                                  MessageBuilder<byte[]> outputMsgBuilder) 
throws Exception {
         if (null == outputMsgBuilder) {
             inputMsg.ackCumulative();
             return;
@@ -113,15 +115,14 @@ public void sendOutputMessage(InputMessage inputMsg,
             
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
 
 
-        Producer producer = 
outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
+        Producer<byte[]> producer = 
outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg)
                 .thenAccept(messageId -> inputMsg.ackCumulative())
                 .join();
     }
 
-
     @Override
     public void close() {
         super.close();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 33d4f77d0..97c971d0a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -82,7 +82,7 @@ void setupInput(Map<String, SerDe> inputSerDe)
      *
      * @return the input consumer.
      */
-    Consumer getInputConsumer();
+    Consumer<byte[]> getInputConsumer();
 
     /**
      * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
@@ -103,7 +103,7 @@ void setupInput(Map<String, SerDe> inputSerDe)
      * @param outputMsgBuilder output message builder. it can be null.
      */
     void sendOutputMessage(InputMessage inputMsg,
-                           MessageBuilder outputMsgBuilder) throws 
PulsarClientException, Exception;
+                           MessageBuilder<byte[]> outputMsgBuilder) throws 
PulsarClientException, Exception;
 
     /**
      * Get the next message to process
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index d525a1aac..fc37b134e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -24,7 +24,9 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,7 +53,7 @@
     protected SerDe outputSerDe;
 
     @Getter
-    protected Consumer inputConsumer;
+    protected Consumer<byte[]> inputConsumer;
 
     protected List<String> topics;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index bba9a7f86..7a561a17d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
@@ -32,51 +33,43 @@
 
     protected final PulsarClient client;
     protected final String outputTopic;
-    protected final ProducerConfiguration conf;
 
     AbstractOneOuputTopicProducers(PulsarClient client,
                                    String outputTopic)
             throws PulsarClientException {
         this.client = client;
         this.outputTopic = outputTopic;
-        this.conf = newProducerConfiguration();
     }
 
-    static ProducerConfiguration newProducerConfiguration() {
-        ProducerConfiguration conf = new ProducerConfiguration();
-        conf.setBlockIfQueueFull(true);
-        conf.setBatchingEnabled(true);
-        conf.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
-        conf.setMaxPendingMessages(1000000);
-        conf.setCompressionType(CompressionType.LZ4);
-        conf.setHashingScheme(HashingScheme.Murmur3_32Hash);
-        conf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
         // use function result router to deal with different processing 
guarantees.
-        conf.setMessageRouter(FunctionResultRouter.of());
-        return conf;
+        return client.newProducer() //
+                .blockIfQueueFull(true) //
+                .enableBatching(true) //
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
+                .compressionType(CompressionType.LZ4) //
+                .hashingScheme(HashingScheme.Murmur3_32Hash) //
+                .messageRoutingMode(MessageRoutingMode.CustomPartition) //
+                .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer createProducer(String topic)
+    protected Producer<byte[]> createProducer(String topic)
             throws PulsarClientException {
         return createProducer(client, topic);
     }
 
-    public static Producer createProducer(PulsarClient client, String topic)
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic)
             throws PulsarClientException {
-        return client.createProducer(topic, newProducerConfiguration());
+        return newProducerBuilder(client).topic(topic).create();
     }
 
-    protected Producer createProducer(String topic, String producerName)
+    protected Producer<byte[]> createProducer(String topic, String 
producerName)
             throws PulsarClientException {
         return createProducer(client, topic, producerName);
     }
 
-    public static Producer createProducer(PulsarClient client, String topic, 
String producerName)
-        throws PulsarClientException {
-        ProducerConfiguration newConf = newProducerConfiguration();
-        newConf.setProducerName(producerName);
-
-        return client.createProducer(topic, newConf);
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic, String producerName)
+            throws PulsarClientException {
+        return 
newProducerBuilder(client).topic(topic).producerName(producerName).create();
     }
-
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index cf21a042b..3668311ed 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -37,7 +37,7 @@
 public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, IntObjectMap<Producer>> producers;
+    private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
                                                 String outputTopic)
@@ -56,14 +56,14 @@ static String makeProducerName(String srcTopicName, int 
srcTopicPartition) {
     }
 
     @Override
-    public synchronized Producer getProducer(String srcTopicName, int 
srcTopicPartition) throws PulsarClientException {
-        IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+    public synchronized Producer<byte[]> getProducer(String srcTopicName, int 
srcTopicPartition) throws PulsarClientException {
+        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
         if (null == producerMap) {
             producerMap = new IntObjectHashMap<>();
             producers.put(srcTopicName, producerMap);
         }
 
-        Producer producer = producerMap.get(srcTopicPartition);
+        Producer<byte[]> producer = producerMap.get(srcTopicPartition);
         if (null == producer) {
             producer = createProducer(outputTopic, 
makeProducerName(srcTopicName, srcTopicPartition));
             producerMap.put(srcTopicPartition, producer);
@@ -73,10 +73,10 @@ public synchronized Producer getProducer(String 
srcTopicName, int srcTopicPartit
 
     @Override
     public synchronized void closeProducer(String srcTopicName, int 
srcTopicPartition) {
-        IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
 
         if (null != producerMap) {
-            Producer producer = producerMap.remove(srcTopicPartition);
+            Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
             if (null != producer) {
                 producer.closeAsync();
             }
@@ -89,8 +89,8 @@ public synchronized void closeProducer(String srcTopicName, 
int srcTopicPartitio
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (IntObjectMap<Producer> producerMap: producers.values()) {
-            for (Producer producer : producerMap.values()) {
+        for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+            for (Producer<byte[]> producer : producerMap.values()) {
                 closeFutures.add(producer.closeAsync());
             }
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 9669bea2b..29cd96a1e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,8 +42,7 @@
      *          src topic partition
      * @return the producer instance to produce messages
      */
-    Producer getProducer(String srcTopicName,
-                         int srcTopicPartition) throws PulsarClientException;
+    Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) 
throws PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcTopicName</tt> and 
<tt>srcTopicPartition</tt>
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index f98ac4497..a1ad2898d 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -45,7 +45,7 @@ public void setup() {
 
     @Test
     public void testChoosePartitionWithoutKeyWithoutSequenceId() {
-        Message msg = mock(Message.class);
+        Message<?> msg = mock(Message.class);
         when(msg.hasKey()).thenReturn(false);
         when(msg.getKey()).thenReturn(null);
         when(msg.getSequenceId()).thenReturn(-1L);
@@ -70,7 +70,7 @@ public void testChoosePartitionWithoutKeySequenceId() {
 
         FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
-            Message msg = mock(Message.class);
+            Message<?> msg = mock(Message.class);
             when(msg.hasKey()).thenReturn(false);
             when(msg.getKey()).thenReturn(null);
             when(msg.getSequenceId()).thenReturn((long) (2 * i));
@@ -82,11 +82,11 @@ public void testChoosePartitionWithoutKeySequenceId() {
     public void testChoosePartitionWithKeyWithoutSequenceId() {
         String key1 = "key1";
         String key2 = "key2";
-        Message msg1 = mock(Message.class);
+        Message<?> msg1 = mock(Message.class);
         when(msg1.hasKey()).thenReturn(true);
         when(msg1.getKey()).thenReturn(key1);
         when(msg1.getSequenceId()).thenReturn(-1L);
-        Message msg2 = mock(Message.class);
+        Message<?> msg2 = mock(Message.class);
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn(-1L);
@@ -105,12 +105,12 @@ public void testChoosePartitionWithKeyWithoutSequenceId() 
{
     public void testChoosePartitionWithKeySequenceId() {
         String key1 = "key1";
         String key2 = "key2";
-        Message msg1 = mock(Message.class);
+        Message<?> msg1 = mock(Message.class);
         when(msg1.hasKey()).thenReturn(true);
         when(msg1.getKey()).thenReturn(key1);
         // make sure sequence id is different from hashcode, so the test can 
be tested correctly.
         when(msg1.getSequenceId()).thenReturn((long) ((key1.hashCode() % 100) 
+ 1));
-        Message msg2 = mock(Message.class);
+        Message<?> msg2 = mock(Message.class);
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100) 
+ 1));
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 5b463b642..a11062fb5 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -1009,4 +1009,4 @@
 //            assertTrue(mockProducers.isEmpty());
 //        }
 //    }
-//}
\ No newline at end of file
+//}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 77de78b4f..b9e762a58 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -55,5 +55,6 @@ public void testLambda() {
         JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
         assertNotNull(result.getResult());
         assertEquals(new String(testString + "-lambda"), result.getResult());
+        instance.close();
     }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 577015a2c..e6072c8d3 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,9 +19,6 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import static 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -33,10 +30,19 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -48,34 +54,152 @@
     private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
-    private final Map<String, Producer> mockProducers = new HashMap<>();
+    private final Map<String, Producer<byte[]>> mockProducers = new 
HashMap<>();
     private MultiConsumersOneOuputTopicProducers producers;
 
+    private class MockProducerBuilder implements ProducerBuilder<byte[]> {
+
+        String producerName = "";
+
+        @Override
+        public Producer<byte[]> create() throws PulsarClientException {
+            Producer<byte[]> producer;
+            synchronized (mockProducers) {
+                producer = mockProducers.get(producerName);
+                if (null == producer) {
+                    producer = createMockProducer(producerName);
+                    mockProducers.put(producerName, producer);
+                }
+            }
+            return producer;
+        }
+
+        @Override
+        public CompletableFuture<Producer<byte[]>> createAsync() {
+            try {
+                return CompletableFuture.completedFuture(create());
+            } catch (PulsarClientException e) {
+                CompletableFuture<Producer<byte[]>> future = new 
CompletableFuture<>();
+                future.completeExceptionally(e);
+                return future;
+            }
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> clone() {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> topic(String topicName) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> producerName(String producerName) {
+            this.producerName = producerName;
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> sendTimeout(int sendTimeout, TimeUnit 
unit) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> maxPendingMessages(int 
maxPendingMessages) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> blockIfQueueFull(boolean 
blockIfQueueFull) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> messageRoutingMode(MessageRoutingMode 
messageRoutingMode) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> hashingScheme(HashingScheme 
hashingScheme) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> compressionType(CompressionType 
compressionType) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> messageRouter(MessageRouter 
messageRouter) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> enableBatching(boolean enableBatching) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> cryptoKeyReader(CryptoKeyReader 
cryptoKeyReader) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> addEncryptionKey(String key) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> 
cryptoFailureAction(ProducerCryptoFailureAction action) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> batchingMaxPublishDelay(long 
batchDelay, TimeUnit timeUnit) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> batchingMaxMessages(int 
batchMessagesMaxMessagesPerBatch) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> initialSequenceId(long 
initialSequenceId) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> property(String key, String value) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> properties(Map<String, String> 
properties) {
+            return this;
+        }
+    }
+
     @BeforeMethod
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.createProducer(anyString(), 
any(ProducerConfiguration.class)))
-            .thenAnswer(invocationOnMock -> {
-                ProducerConfiguration conf = invocationOnMock.getArgumentAt(1, 
ProducerConfiguration.class);
-                String producerName = conf.getProducerName();
-
-                synchronized (mockProducers) {
-                    Producer producer = mockProducers.get(producerName);
-                    if (null == producer) {
-                        producer = createMockProducer(producerName);
-                        mockProducers.put(producerName, producer);
-                    }
-                    return producer;
-                }
-            });
+        when(mockClient.newProducer())
+            .thenReturn(new MockProducerBuilder());
 
         producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
         producers.initialize();
     }
 
-    private Producer createMockProducer(String topic) {
-        Producer producer = mock(Producer.class);
+    private Producer<byte[]> createMockProducer(String topic) {
+        Producer<byte[]> producer = mock(Producer.class);
         when(producer.closeAsync())
             .thenAnswer(invocationOnMock -> {
                 synchronized (mockProducers) {
@@ -90,16 +214,13 @@ private Producer createMockProducer(String topic) {
     public void testGetCloseProducer() throws Exception {
         String srcTopic = "test-src-topic";
         int ptnIdx = 1234;
-        Producer producer = producers.getProducer(srcTopic, ptnIdx);
+        Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
 
         String producerName = makeProducerName(srcTopic, ptnIdx);
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .createProducer(
-                eq(TEST_OUTPUT_TOPIC),
-                any(ProducerConfiguration.class)
-            );
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(srcTopic));
         assertEquals(1, producers.getProducers().get(srcTopic).size());
         assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
@@ -107,10 +228,7 @@ public void testGetCloseProducer() throws Exception {
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .createProducer(
-                eq(TEST_OUTPUT_TOPIC),
-                any(ProducerConfiguration.class)
-            );
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(srcTopic));
         assertEquals(1, producers.getProducers().get(srcTopic).size());
         assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 13d8e62e4..df89eed8e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,8 +20,9 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientConfiguration;
+
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -46,7 +47,7 @@ public ThreadRuntimeFactory(String threadGroupName,
             throws Exception {
         this(
             threadGroupName,
-            pulsarServiceUrl != null ? PulsarClient.create(pulsarServiceUrl, 
new ClientConfiguration()) : null,
+            pulsarServiceUrl != null ? 
PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null,
             storageServiceUrl);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 0a4cdeb0b..1c045d48c 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -29,13 +29,13 @@
 
 @Slf4j
 public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message>, Function<Throwable, 
Void>, AutoCloseable {
+    implements java.util.function.Consumer<Message<byte[]>>, 
Function<Throwable, Void>, AutoCloseable {
 
         private final FunctionRuntimeManager functionRuntimeManager;
-        private final Reader reader;
+        private final Reader<byte[]> reader;
 
     public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
-                Reader reader)
+                Reader<byte[]> reader)
             throws PulsarClientException {
             this.functionRuntimeManager = functionRuntimeManager;
             this.reader = reader;
@@ -64,7 +64,7 @@ public void close() {
     }
 
     @Override
-    public void accept(Message msg) {
+    public void accept(Message<byte[]> msg) {
 
         // check if latest
         boolean hasMessageAvailable;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index b5f975154..29a322043 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -88,7 +88,7 @@ public void initialize() {
         log.info("/** Initializing Function Metadata Manager **/");
         try {
 
-            Reader reader = pulsarClient.newReader()
+            Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
                     .create();
@@ -410,6 +410,6 @@ public void close() throws Exception {
     }
 
     private ServiceRequestManager getServiceRequestManager(PulsarClient 
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
-        return new 
ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
+        return new 
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index a37315899..4bcaaf7e0 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,13 +29,13 @@
 
 @Slf4j
 public class FunctionMetaDataTopicTailer
-        implements java.util.function.Consumer<Message>, Function<Throwable, 
Void>, AutoCloseable {
+        implements java.util.function.Consumer<Message<byte[]>>, 
Function<Throwable, Void>, AutoCloseable {
 
     private final FunctionMetaDataManager functionMetaDataManager;
-    private final Reader reader;
+    private final Reader<byte[]> reader;
 
     public FunctionMetaDataTopicTailer(FunctionMetaDataManager 
functionMetaDataManager,
-                                       Reader reader)
+                                       Reader<byte[]> reader)
             throws PulsarClientException {
         this.functionMetaDataManager = functionMetaDataManager;
         this.reader = reader;
@@ -62,7 +62,7 @@ public void close() {
         log.info("Stopped function state consumer");
     }
 
-    public void processRequest(Message msg) {
+    public void processRequest(Message<byte[]> msg) {
         ServiceRequest serviceRequest;
 
         try {
@@ -80,7 +80,7 @@ public void processRequest(Message msg) {
     }
 
     @Override
-    public void accept(Message msg) {
+    public void accept(Message<byte[]> msg) {
 
         processRequest(msg);
         // receive next request
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7e793a3a..af2399230 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -85,7 +85,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
                                   MembershipManager membershipManager) throws 
Exception {
         this.workerConfig = workerConfig;
 
-        Reader reader = pulsarClient.newReader()
+        Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
                 .startMessageId(MessageId.earliest)
                 .create();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index 21dd69ff3..2a3dc0f44 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -22,12 +22,10 @@
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * A starter to start function worker.
  */
-@Slf4j
 public class FunctionWorkerStarter {
 
     private static class WorkerArguments {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 4e5fb2139..f43311d92 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,16 +32,15 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,7 @@
 public class MembershipManager implements AutoCloseable, ConsumerEventListener 
{
 
     private final String consumerName;
-    private final Consumer consumer;
+    private final Consumer<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private PulsarAdmin pulsarAdminClient;
     private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -85,18 +86,17 @@
         // we don't produce any messages into this topic, we only use the 
`failover` subscription
         // to elect an active consumer as the leader worker. The leader worker 
will be responsible
         // for scheduling snapshots for FMT and doing task assignment.
-        consumer = client.subscribe(
-                workerConfig.getClusterCoordinationTopic(),
-                COORDINATION_TOPIC_SUBSCRIPTION,
-                new ConsumerConfiguration()
-                        .setSubscriptionType(SubscriptionType.Failover)
-                        .setConsumerEventListener(this)
-                        .setProperty(WORKER_IDENTIFIER, consumerName)
-        );
+        consumer = client.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
     }
 
     @Override
-    public void becameActive(Consumer consumer, int partitionId) {
+    public void becameActive(Consumer<?> consumer, int partitionId) {
         firstConsumerEventFuture.complete(null);
         if (isLeader.compareAndSet(false, true)) {
             log.info("Worker {} became the leader.", consumerName);
@@ -104,7 +104,7 @@ public void becameActive(Consumer consumer, int 
partitionId) {
     }
 
     @Override
-    public void becameInactive(Consumer consumer, int partitionId) {
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
         firstConsumerEventFuture.complete(null);
         if (isLeader.compareAndSet(true, false)) {
             log.info("Worker {} lost the leadership.", consumerName);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c6b72a543..ed00958d7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,21 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -47,6 +32,21 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.scheduler.IScheduler;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
@@ -63,7 +63,7 @@
 
     private final IScheduler scheduler;
 
-    private final Producer producer;
+    private final Producer<byte[]> producer;
 
     private final ExecutorService executorService;
 
@@ -72,14 +72,10 @@ public SchedulerManager(WorkerConfig workerConfig, 
PulsarClient pulsarClient) {
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
-        ProducerConfiguration producerConf = new ProducerConfiguration()
-            .setBatchingEnabled(true)
-            .setBlockIfQueueFull(true)
-            .setCompressionType(CompressionType.LZ4)
-            // retry until succeed
-            .setSendTimeout(0, TimeUnit.MILLISECONDS);
         try {
-            this.producer = 
pulsarClient.createProducer(this.workerConfig.getFunctionAssignmentTopic(), 
producerConf);
+            this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
+                    
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+                    sendTimeout(0, TimeUnit.MILLISECONDS).create();
         } catch (PulsarClientException e) {
             log.error("Failed to create producer to function assignment topic "
                     + this.workerConfig.getFunctionAssignmentTopic(), e);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 5a9f3393d..071e946ed 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,8 +18,18 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import dlshade.org.apache.zookeeper.KeeperException.Code;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,17 +43,7 @@
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.UUID;
+import dlshade.org.apache.zookeeper.KeeperException.Code;
 
 @Slf4j
 public final class Utils {
@@ -129,13 +129,14 @@ public static void downloadFromBookkeeper(Namespace 
namespace,
                                                  OutputStream outputStream,
                                                  String packagePath) throws 
IOException {
         DistributedLogManager dlm = namespace.openLog(packagePath);
-        InputStream in = new DLInputStream(dlm);
-        int read = 0;
-        byte[] bytes = new byte[1024];
-        while ((read = in.read(bytes)) != -1) {
-            outputStream.write(bytes, 0, read);
+        try (InputStream in = new DLInputStream(dlm)) {
+            int read = 0;
+            byte[] bytes = new byte[1024];
+            while ((read = in.read(bytes)) != -1) {
+                outputStream.write(bytes, 0, read);
+            }
+            outputStream.flush();
         }
-        outputStream.flush();
     }
 
     public static DistributedLogConfiguration getDlogConf(WorkerConfig 
workerConfig) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index fb875b5ee..06f4e53dd 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,12 +20,16 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
 
-import lombok.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import lombok.experimental.Accessors;
 
 @Data
@@ -92,7 +96,7 @@ public String getClusterCoordinationTopic() {
     public String getFunctionAssignmentTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, 
functionAssignmentTopicName);
     }
-    
+
     public static WorkerConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), WorkerConfig.class);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5df3c3ff0..e33dcbaa4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,7 +76,7 @@ public void start(URI dlogUri) throws InterruptedException {
         // initialize the function metadata manager
         try {
 
-            this.client = 
PulsarClient.create(this.workerConfig.getPulsarServiceUrl());
+            this.client = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build();
             log.info("Created Pulsar client");
 
             //create scheduler manager
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 3d27fb498..ac99f9a6a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -19,17 +19,21 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
@@ -42,12 +46,24 @@
 @Slf4j
 public class FunctionMetaDataManagerTest {
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+
+        when(builder.create()).thenReturn(mock(Producer.class));
+
+        PulsarClient client = mock(PulsarClient.class);
+        when(client.newProducer()).thenReturn(builder);
+
+        return client;
+    }
+
     @Test
     public void testListFunctions() throws PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(new WorkerConfig(),
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new 
HashMap<>();
         functionMetaDataMap1.put("func-1", 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -87,7 +103,7 @@ public void updateFunction() throws PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
 
@@ -123,7 +139,7 @@ public boolean matches(Object o) {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Map<String, Function.FunctionMetaData> functionMetaDataMap = new 
HashMap<>();
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -167,7 +183,7 @@ public void deregisterFunction() throws 
PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
                         
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -209,7 +225,7 @@ public void testProcessRequest() throws 
PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         
Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
         
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -247,7 +263,7 @@ public void processUpdateTest() throws 
PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         // worker has no record of function
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -275,7 +291,7 @@ public void processUpdateTest() throws 
PulsarClientException {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -324,7 +340,7 @@ public void processUpdateTest() throws 
PulsarClientException {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -361,7 +377,7 @@ public void processDeregister() throws 
PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         // worker has no record of function
         Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -388,7 +404,7 @@ public void processDeregister() throws 
PulsarClientException {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         functionMetaDataManager.setFunctionMetaData(test);
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -415,7 +431,7 @@ public void processDeregister() throws 
PulsarClientException {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         functionMetaDataManager.setFunctionMetaData(test);
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14b56acd3..6dd3fa3c3 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,24 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
@@ -49,6 +31,28 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.proto.Function;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 public class MembershipManagerTest {
 
     private final WorkerConfig workerConfig;
@@ -63,29 +67,33 @@ public MembershipManagerTest() {
 
     @Test
     public void testConsumerEventListener() throws Exception {
-        PulsarClient mockClient = mock(PulsarClient.class);
-        Consumer mockConsumer = mock(Consumer.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
+
+        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
 
         AtomicReference<ConsumerEventListener> listenerHolder = new 
AtomicReference<>();
-        when(mockClient.subscribe(
-            eq(workerConfig.getClusterCoordinationTopic()),
-            eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
-            any(ConsumerConfiguration.class)
-        )).thenAnswer(invocationOnMock -> {
+        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
 -> {
 
-            ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2, 
ConsumerConfiguration.class);
-            listenerHolder.set(conf.getConsumerEventListener());
+            ConsumerEventListener listener = invocationOnMock.getArgumentAt(0, 
ConsumerEventListener.class);
+            listenerHolder.set(listener);
 
-            return mockConsumer;
+            return mockConsumerBuilder;
         });
 
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockClient));
         assertFalse(membershipManager.isLeader());
         verify(mockClient, times(1))
-            .subscribe(
-                eq(workerConfig.getClusterCoordinationTopic()),
-                eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
-                any(ConsumerConfiguration.class));
+            .newConsumer();
 
         listenerHolder.get().becameActive(mockConsumer, 0);
         assertTrue(membershipManager.isLeader());
@@ -94,11 +102,31 @@ public void testConsumerEventListener() throws Exception {
         assertFalse(membershipManager.isLeader());
     }
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
+
+        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+        return mockClient;
+    }
+
     @Test
     public void testCheckFailuresNoFailures() throws Exception {
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -110,7 +138,7 @@ public void testCheckFailuresNoFailures() throws Exception {
                 mock(MembershipManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
@@ -162,7 +190,7 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -175,7 +203,7 @@ public void testCheckFailuresSomeFailures() throws 
Exception {
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
@@ -251,7 +279,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -263,7 +291,7 @@ public void testCheckFailuresSomeUnassigned() throws 
Exception {
                 mock(MembershipManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 64d693120..b5b4e981e 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.functions.worker;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +46,9 @@
 import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -61,6 +66,18 @@
     private CompletableFuture<MessageId> completableFuture;
     private Producer producer;
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+
+        when(builder.create()).thenReturn(mock(Producer.class));
+
+        PulsarClient client = mock(PulsarClient.class);
+        when(client.newProducer()).thenReturn(builder);
+
+        return client;
+    }
+
     @BeforeMethod
     public void setup() throws PulsarClientException {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -78,8 +95,17 @@ public void setup() throws PulsarClientException {
         byte[] bytes = any();
         when(producer.sendAsync(bytes)).thenReturn(completableFuture);
 
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+        when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+        
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+        when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
+
+        when(builder.create()).thenReturn(producer);
+
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
+        when(pulsarClient.newProducer()).thenReturn(builder);
 
         schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
@@ -380,9 +406,6 @@ public void testScalingUp() throws Exception {
 
         // scale up
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
                         
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version)
@@ -500,9 +523,6 @@ public void testScalingDown() throws Exception {
 
         // scale down
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
                         
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
@@ -612,9 +632,6 @@ public void testUpdate() throws Exception {
 
         // scale down
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Updated = 
Function.FunctionMetaData.newBuilder()
                 
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 406a2c815..cb21f6f8c 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Enums;
 import com.google.common.base.Splitter;
+
 import java.io.IOException;
 import java.util.Base64;
 import java.util.List;
@@ -30,15 +31,19 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -63,10 +68,10 @@
 public class ConsumerHandler extends AbstractWebSocketHandler {
 
     private String subscription = null;
-    private final ConsumerConfiguration conf;
+    private SubscriptionType subscriptionType;
     private Consumer<byte[]> consumer;
 
-    private final int maxPendingMessages;
+    private int maxPendingMessages;
     private final AtomicInteger pendingMessages = new AtomicInteger();
 
     private final LongAdder numMsgsDelivered;
@@ -78,20 +83,26 @@
 
     public ConsumerHandler(WebSocketService service, HttpServletRequest 
request, ServletUpgradeResponse response) {
         super(service, request, response);
-        this.conf = getConsumerConfiguration();
-        this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : 
conf.getReceiverQueueSize();
+
+        ConsumerBuilderImpl<byte[]> builder;
+
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
 
         try {
+            builder = (ConsumerBuilderImpl<byte[]>) 
getConsumerConfiguration(service.getPulsarClient());
+            this.maxPendingMessages = 
(builder.getConf().getReceiverQueueSize() == 0) ? 1
+                    : builder.getConf().getReceiverQueueSize();
+            this.subscriptionType = builder.getConf().getSubscriptionType();
+
             // checkAuth() should be called after assigning a value to 
this.subscription
             this.subscription = extractSubscription(request);
             if (!checkAuth(response)) {
                 return;
             }
 
-            this.consumer = 
service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
+            this.consumer = 
builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
             if (!this.service.addConsumer(this)) {
                 log.warn("[{}:{}] Failed to add consumer handler for topic 
{}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -242,7 +253,7 @@ public void close() throws IOException {
         }
     }
 
-    public Consumer getConsumer() {
+    public Consumer<byte[]> getConsumer() {
         return this.consumer;
     }
 
@@ -251,7 +262,7 @@ public String getSubscription() {
     }
 
     public SubscriptionType getSubscriptionType() {
-        return conf.getSubscriptionType();
+        return subscriptionType;
     }
 
     public long getAndResetNumMsgsDelivered() {
@@ -267,7 +278,7 @@ public long getAndResetNumMsgsAcked() {
     }
 
     public long getMsgDeliveredCounter() {
-        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+        return msgDeliveredCounter;
     }
 
     protected void updateDeliverMsgStat(long msgSize) {
@@ -276,32 +287,32 @@ protected void updateDeliverMsgStat(long msgSize) {
         numBytesDelivered.add(msgSize);
     }
 
-    private ConsumerConfiguration getConsumerConfiguration() {
-        ConsumerConfiguration conf = new ConsumerConfiguration();
+    private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient 
client) {
+        ConsumerBuilder<byte[]> builder = client.newConsumer();
 
         if (queryParams.containsKey("ackTimeoutMillis")) {
-            
conf.setAckTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")), 
TimeUnit.MILLISECONDS);
+            
builder.ackTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
         if (queryParams.containsKey("subscriptionType")) {
             checkArgument(Enums.getIfPresent(SubscriptionType.class, 
queryParams.get("subscriptionType")).isPresent(),
                     "Invalid subscriptionType %s", 
queryParams.get("subscriptionType"));
-            
conf.setSubscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
+            
builder.subscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
         }
 
         if (queryParams.containsKey("receiverQueueSize")) {
-            
conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
 1000));
+            
builder.receiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
 1000));
         }
 
         if (queryParams.containsKey("consumerName")) {
-            conf.setConsumerName(queryParams.get("consumerName"));
+            builder.consumerName(queryParams.get("consumerName"));
         }
 
         if (queryParams.containsKey("priorityLevel")) {
-            
conf.setPriorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
+            
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
         }
 
-        return conf;
+        return builder;
     }
 
     @Override
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ed8adfcb..80545bff9 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -24,6 +24,9 @@
 import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
 import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Enums;
+
 import java.io.IOException;
 import java.util.Base64;
 import java.util.concurrent.TimeUnit;
@@ -35,12 +38,13 @@
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
@@ -52,9 +56,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Enums;
-
 
 /**
  * Websocket end-point url handler to handle incoming message coming from 
client. Websocket end-point url handler to
@@ -91,8 +92,7 @@ public ProducerHandler(WebSocketService service, 
HttpServletRequest request, Ser
         }
 
         try {
-            ProducerConfiguration conf = getProducerConfiguration();
-            this.producer = 
service.getPulsarClient().createProducer(topic.toString(), conf);
+            this.producer = 
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
             if (!this.service.addProducer(this)) {
                 log.warn("[{}:{}] Failed to add producer handler for topic 
{}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -166,7 +166,7 @@ public void onWebSocketText(String message) {
         }
 
         final long msgSize = rawPayload.length;
-        MessageBuilder builder = 
MessageBuilder.create().setContent(rawPayload);
+        MessageBuilder<byte[]> builder = 
MessageBuilder.create().setContent(rawPayload);
 
         if (sendRequest.properties != null) {
             builder.setProperties(sendRequest.properties);
@@ -196,7 +196,7 @@ public void onWebSocketText(String message) {
         });
     }
 
-    public Producer getProducer() {
+    public Producer<byte[]> getProducer() {
         return this.producer;
     }
 
@@ -222,7 +222,7 @@ public StatsBuckets getPublishLatencyStatsUSec() {
     }
 
     public long getMsgPublishedCounter() {
-        return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
+        return msgPublishedCounter;
     }
 
     @Override
@@ -248,45 +248,44 @@ private void updateSentMsgStats(long msgSize, long 
latencyUsec) {
         MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
     }
 
-    private ProducerConfiguration getProducerConfiguration() {
-        ProducerConfiguration conf = new ProducerConfiguration();
-
-        conf.setBatchingEnabled(false);
-        conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+    private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+        ProducerBuilder<byte[]> builder = client.newProducer()
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
 
         // Set to false to prevent the server thread from being blocked if a 
lot of messages are pending.
-        conf.setBlockIfQueueFull(false);
+        builder.blockIfQueueFull(false);
 
         if (queryParams.containsKey("producerName")) {
-            conf.setProducerName(queryParams.get("producerName"));
+            builder.producerName(queryParams.get("producerName"));
         }
 
         if (queryParams.containsKey("initialSequenceId")) {
-            conf.setInitialSequenceId(Long.parseLong("initialSequenceId"));
+            builder.initialSequenceId(Long.parseLong("initialSequenceId"));
         }
 
         if (queryParams.containsKey("hashingScheme")) {
-            
conf.setHashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
+            
builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
         }
 
         if (queryParams.containsKey("sendTimeoutMillis")) {
-            
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
+            
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
         if (queryParams.containsKey("batchingEnabled")) {
-            
conf.setBatchingEnabled(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
         }
 
         if (queryParams.containsKey("batchingMaxMessages")) {
-            
conf.setBatchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+            
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
         }
 
         if (queryParams.containsKey("maxPendingMessages")) {
-            
conf.setMaxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+            
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
         }
 
         if (queryParams.containsKey("batchingMaxPublishDelay")) {
-            
conf.setBatchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+            
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
                     TimeUnit.MILLISECONDS);
         }
 
@@ -296,17 +295,17 @@ private ProducerConfiguration getProducerConfiguration() {
                     "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
             MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
             if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
-                conf.setMessageRoutingMode(routingMode);
+                builder.messageRoutingMode(routingMode);
             }
         }
 
         if (queryParams.containsKey("compressionType")) {
             checkArgument(Enums.getIfPresent(CompressionType.class, 
queryParams.get("compressionType")).isPresent(),
                     "Invalid compressionType %s", 
queryParams.get("compressionType"));
-            
conf.setCompressionType(CompressionType.valueOf(queryParams.get("compressionType")));
+            
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
         }
 
-        return conf;
+        return builder;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerHandler.class);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54037fa85..82d7cff5e 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -94,7 +94,7 @@ public ReaderHandler(WebSocketService service, 
HttpServletRequest request, Servl
 
             this.reader = builder.create();
 
-            this.subscription = 
((ReaderImpl)this.reader).getConsumer().getSubscription();
+            this.subscription = ((ReaderImpl<?>) 
this.reader).getConsumer().getSubscription();
             if (!this.service.addReader(this)) {
                 log.warn("[{}:{}] Failed to add reader handler for topic {}", 
request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -214,8 +214,8 @@ public void close() throws IOException {
         }
     }
 
-    public Consumer getConsumer() {
-        return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
+    public Consumer<?> getConsumer() {
+        return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
     }
 
     public String getSubscription() {
@@ -235,7 +235,7 @@ public long getAndResetNumBytesDelivered() {
     }
 
     public long getMsgDeliveredCounter() {
-        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+        return msgDeliveredCounter;
     }
 
     protected void updateDeliverMsgStat(long msgSize) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to