This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 064ba5a  Modified functions producer consumers to use new builder 
based api (#1580)
064ba5a is described below

commit 064ba5aa9ae8a3032d9993d1b145eb15c852e11f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 25 23:36:30 2018 -0700

    Modified functions producer consumers to use new builder based api (#1580)
    
    * Modified functions producer consumers to use new builder based api
    
    * remove `@Test` on MultiConsumersOneOutputTopicProducersTest, since the 
client creation is changed to builder and this code might eventually go aways 
with connect related changes.
    
    * Fix MultiConsumersOneOutputTopicProducersTest
    
    * Fix FunctionMetaDataManagerTest
    
    * Fix MembershipManagerTest
    
    * Fix SchedulerManagerTest.java
---
 .../apache/pulsar/compaction/CompactorTool.java    |  27 ++--
 .../pulsar/compaction/TwoPhaseCompactor.java       |  13 +-
 .../pulsar/client/admin/internal/ClustersImpl.java |   9 +-
 .../admin/internal/PersistentTopicsImpl.java       |   3 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   4 +
 .../instance/processors/AtLeastOnceProcessor.java  |   6 +-
 .../instance/processors/AtMostOnceProcessor.java   |   6 +-
 .../processors/EffectivelyOnceProcessor.java       |  11 +-
 .../instance/processors/MessageProcessor.java      |   4 +-
 .../instance/processors/MessageProcessorBase.java  |   4 +-
 .../producers/AbstractOneOuputTopicProducers.java  |  47 +++---
 .../MultiConsumersOneOuputTopicProducers.java      |  16 +-
 .../functions/instance/producers/Producers.java    |   3 +-
 .../instance/FunctionResultRouterTest.java         |  12 +-
 .../instance/JavaInstanceRunnableProcessTest.java  |   2 +-
 .../functions/instance/JavaInstanceTest.java       |   1 +
 .../MultiConsumersOneOutputTopicProducersTest.java | 178 +++++++++++++++++----
 .../functions/runtime/ThreadRuntimeFactory.java    |   5 +-
 .../functions/worker/FunctionAssignmentTailer.java |   8 +-
 .../functions/worker/FunctionMetaDataManager.java  |   4 +-
 .../worker/FunctionMetaDataTopicTailer.java        |  10 +-
 .../functions/worker/FunctionRuntimeManager.java   |   2 +-
 .../functions/worker/FunctionWorkerStarter.java    |   2 -
 .../pulsar/functions/worker/MembershipManager.java |  26 +--
 .../pulsar/functions/worker/SchedulerManager.java  |  42 +++--
 .../org/apache/pulsar/functions/worker/Utils.java  |  37 ++---
 .../pulsar/functions/worker/WorkerConfig.java      |  10 +-
 .../pulsar/functions/worker/WorkerService.java     |   2 +-
 .../worker/FunctionMetaDataManagerTest.java        |  38 +++--
 .../functions/worker/MembershipManagerTest.java    | 104 +++++++-----
 .../functions/worker/SchedulerManagerTest.java     |  37 +++--
 .../apache/pulsar/websocket/ConsumerHandler.java   |  47 +++---
 .../apache/pulsar/websocket/ProducerHandler.java   |  55 ++++---
 .../org/apache/pulsar/websocket/ReaderHandler.java |   8 +-
 34 files changed, 485 insertions(+), 298 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 0736146..c16d676 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.compaction;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.nio.file.Paths;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +35,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -40,12 +44,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import lombok.Cleanup;
-
 public class CompactorTool {
 
     private static class Arguments {
@@ -82,15 +80,16 @@ public class CompactorTool {
         }
 
         String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
-        ClientConfiguration clientConfig = new ClientConfiguration();
+        ClientBuilder clientBuilder = PulsarClient.builder();
 
         if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-            
clientConfig.setAuthentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
-                                           
brokerConfig.getBrokerClientAuthenticationParameters());
+            
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+                    brokerConfig.getBrokerClientAuthenticationParameters());
         }
-        clientConfig.setUseTls(brokerConfig.isTlsEnabled());
-        
clientConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
-        
clientConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+        clientBuilder.serviceUrl(pulsarServiceUrl)
+                .enableTls(brokerConfig.isTlsEnabled())
+                
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
+                
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
 
         ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(
                 new 
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -103,7 +102,7 @@ public class CompactorTool {
                                               
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
         BookKeeperClientFactory bkClientFactory = new 
BookKeeperClientFactoryImpl();
         BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
-        try (PulsarClient pulsar = PulsarClient.create(pulsarServiceUrl, 
clientConfig)) {
+        try (PulsarClient pulsar = clientBuilder.build()) {
             Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, 
bk, scheduler);
             long ledgerId = compactor.compact(arguments.topic).get();
             log.info("Compaction of topic {} complete. Compacted to ledger 
{}", arguments.topic, ledgerId);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3213fd9..5db4f2e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,36 +21,33 @@ package org.apache.pulsar.compaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.collect.ImmutableMap;
+
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.impl.RawBatchConverter;
-
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 32cb5c0..e3041ba 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -93,10 +93,11 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
         } catch (Exception e) {
             throw getApiException(e);
         }
-        
+
     }
 
        @Override
+    @SuppressWarnings("unchecked")
        public Set<String> getPeerClusterNames(String cluster) throws 
PulsarAdminException {
                try {
                        return 
request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
@@ -104,7 +105,7 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
                        throw getApiException(e);
                }
        }
-    
+
     @Override
     public void deleteCluster(String cluster) throws PulsarAdminException {
         try {
@@ -125,7 +126,7 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
         }
     }
 
-    
+
     @Override
     public List<BrokerNamespaceIsolationData> 
getBrokersWithNamespaceIsolationPolicy(String cluster)
             throws PulsarAdminException {
@@ -223,7 +224,7 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
             throw getApiException(e);
         }
     }
-    
+
     private void setDomain(String cluster, String domainName,
             FailureDomain domain) throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index 3121739..86f2c05 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -796,7 +797,7 @@ public class PersistentTopicsImpl extends BaseResource 
implements PersistentTopi
                 }
             }
 
-            return Lists.newArrayList(new MessageImpl<byte[]>(msgId, 
properties, data, Schema.IDENTITY));
+            return Collections.singletonList(new MessageImpl<byte[]>(msgId, 
properties, data, Schema.IDENTITY));
         } finally {
             if (stream != null) {
                 stream.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f024a4f..af11ef5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -236,4 +236,8 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
                return this;
        }
+
+       public ConsumerConfigurationData<T> getConf() {
+           return conf;
+       }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 819c118..86aeb83 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -38,7 +38,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 public class AtLeastOnceProcessor extends MessageProcessorBase {
 
     @Getter
-    private Producer producer;
+    private Producer<byte[]> producer;
 
     AtLeastOnceProcessor(PulsarClient client,
                          FunctionDetails functionDetails,
@@ -52,13 +52,13 @@ public class AtLeastOnceProcessor extends 
MessageProcessorBase {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
         if (null == outputMsgBuilder || null == producer) {
             inputMsg.ack();
             return;
         }
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg)
             .thenAccept(msgId -> inputMsg.ack());
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index b6871fa..994be0d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -36,7 +36,7 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 @Slf4j
 class AtMostOnceProcessor extends MessageProcessorBase {
 
-    private Producer producer;
+    private Producer<byte[]> producer;
 
     AtMostOnceProcessor(PulsarClient client,
                         FunctionDetails functionDetails,
@@ -58,12 +58,12 @@ class AtMostOnceProcessor extends MessageProcessorBase {
     }
 
     @Override
-    public void sendOutputMessage(InputMessage inputMsg, MessageBuilder 
outputMsgBuilder) {
+    public void sendOutputMessage(InputMessage inputMsg, 
MessageBuilder<byte[]> outputMsgBuilder) {
         if (null == outputMsgBuilder) {
             return;
         }
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg);
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index d31bb06..59c7bd6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -22,11 +22,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -102,7 +104,7 @@ class EffectivelyOnceProcessor extends MessageProcessorBase 
implements ConsumerE
 
     @Override
     public void sendOutputMessage(InputMessage inputMsg,
-                                  MessageBuilder outputMsgBuilder) throws 
Exception {
+                                  MessageBuilder<byte[]> outputMsgBuilder) 
throws Exception {
         if (null == outputMsgBuilder) {
             inputMsg.ackCumulative();
             return;
@@ -113,15 +115,14 @@ class EffectivelyOnceProcessor extends 
MessageProcessorBase implements ConsumerE
             
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
 
 
-        Producer producer = 
outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
+        Producer<byte[]> producer = 
outputProducer.getProducer(inputMsg.getTopicName(), 
inputMsg.getTopicPartition());
 
-        Message outputMsg = outputMsgBuilder.build();
+        Message<byte[]> outputMsg = outputMsgBuilder.build();
         producer.sendAsync(outputMsg)
                 .thenAccept(messageId -> inputMsg.ackCumulative())
                 .join();
     }
 
-
     @Override
     public void close() {
         super.close();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 33d4f77..97c971d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -82,7 +82,7 @@ public interface MessageProcessor extends AutoCloseable {
      *
      * @return the input consumer.
      */
-    Consumer getInputConsumer();
+    Consumer<byte[]> getInputConsumer();
 
     /**
      * Setup the output with a provided <i>outputSerDe</i>. The implementation 
of this processor is responsible for
@@ -103,7 +103,7 @@ public interface MessageProcessor extends AutoCloseable {
      * @param outputMsgBuilder output message builder. it can be null.
      */
     void sendOutputMessage(InputMessage inputMsg,
-                           MessageBuilder outputMsgBuilder) throws 
PulsarClientException, Exception;
+                           MessageBuilder<byte[]> outputMsgBuilder) throws 
PulsarClientException, Exception;
 
     /**
      * Get the next message to process
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index d525a1a..fc37b13 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -24,7 +24,9 @@ import java.util.Map;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,7 +53,7 @@ abstract class MessageProcessorBase implements 
MessageProcessor {
     protected SerDe outputSerDe;
 
     @Getter
-    protected Consumer inputConsumer;
+    protected Consumer<byte[]> inputConsumer;
 
     protected List<String> topics;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index bba9a7f..7a561a1 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
@@ -32,51 +33,43 @@ public abstract class AbstractOneOuputTopicProducers 
implements Producers {
 
     protected final PulsarClient client;
     protected final String outputTopic;
-    protected final ProducerConfiguration conf;
 
     AbstractOneOuputTopicProducers(PulsarClient client,
                                    String outputTopic)
             throws PulsarClientException {
         this.client = client;
         this.outputTopic = outputTopic;
-        this.conf = newProducerConfiguration();
     }
 
-    static ProducerConfiguration newProducerConfiguration() {
-        ProducerConfiguration conf = new ProducerConfiguration();
-        conf.setBlockIfQueueFull(true);
-        conf.setBatchingEnabled(true);
-        conf.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
-        conf.setMaxPendingMessages(1000000);
-        conf.setCompressionType(CompressionType.LZ4);
-        conf.setHashingScheme(HashingScheme.Murmur3_32Hash);
-        conf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+    static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
         // use function result router to deal with different processing 
guarantees.
-        conf.setMessageRouter(FunctionResultRouter.of());
-        return conf;
+        return client.newProducer() //
+                .blockIfQueueFull(true) //
+                .enableBatching(true) //
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
+                .compressionType(CompressionType.LZ4) //
+                .hashingScheme(HashingScheme.Murmur3_32Hash) //
+                .messageRoutingMode(MessageRoutingMode.CustomPartition) //
+                .messageRouter(FunctionResultRouter.of());
     }
 
-    protected Producer createProducer(String topic)
+    protected Producer<byte[]> createProducer(String topic)
             throws PulsarClientException {
         return createProducer(client, topic);
     }
 
-    public static Producer createProducer(PulsarClient client, String topic)
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic)
             throws PulsarClientException {
-        return client.createProducer(topic, newProducerConfiguration());
+        return newProducerBuilder(client).topic(topic).create();
     }
 
-    protected Producer createProducer(String topic, String producerName)
+    protected Producer<byte[]> createProducer(String topic, String 
producerName)
             throws PulsarClientException {
         return createProducer(client, topic, producerName);
     }
 
-    public static Producer createProducer(PulsarClient client, String topic, 
String producerName)
-        throws PulsarClientException {
-        ProducerConfiguration newConf = newProducerConfiguration();
-        newConf.setProducerName(producerName);
-
-        return client.createProducer(topic, newConf);
+    public static Producer<byte[]> createProducer(PulsarClient client, String 
topic, String producerName)
+            throws PulsarClientException {
+        return 
newProducerBuilder(client).topic(topic).producerName(producerName).create();
     }
-
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index cf21a04..3668311 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, IntObjectMap<Producer>> producers;
+    private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
                                                 String outputTopic)
@@ -56,14 +56,14 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     }
 
     @Override
-    public synchronized Producer getProducer(String srcTopicName, int 
srcTopicPartition) throws PulsarClientException {
-        IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+    public synchronized Producer<byte[]> getProducer(String srcTopicName, int 
srcTopicPartition) throws PulsarClientException {
+        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
         if (null == producerMap) {
             producerMap = new IntObjectHashMap<>();
             producers.put(srcTopicName, producerMap);
         }
 
-        Producer producer = producerMap.get(srcTopicPartition);
+        Producer<byte[]> producer = producerMap.get(srcTopicPartition);
         if (null == producer) {
             producer = createProducer(outputTopic, 
makeProducerName(srcTopicName, srcTopicPartition));
             producerMap.put(srcTopicPartition, producer);
@@ -73,10 +73,10 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
 
     @Override
     public synchronized void closeProducer(String srcTopicName, int 
srcTopicPartition) {
-        IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+        IntObjectMap<Producer<byte[]>> producerMap = 
producers.get(srcTopicName);
 
         if (null != producerMap) {
-            Producer producer = producerMap.remove(srcTopicPartition);
+            Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
             if (null != producer) {
                 producer.closeAsync();
             }
@@ -89,8 +89,8 @@ public class MultiConsumersOneOuputTopicProducers extends 
AbstractOneOuputTopicP
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new 
ArrayList<>(producers.size());
-        for (IntObjectMap<Producer> producerMap: producers.values()) {
-            for (Producer producer : producerMap.values()) {
+        for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+            for (Producer<byte[]> producer : producerMap.values()) {
                 closeFutures.add(producer.closeAsync());
             }
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 9669bea..29cd96a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,8 +42,7 @@ public interface Producers extends AutoCloseable {
      *          src topic partition
      * @return the producer instance to produce messages
      */
-    Producer getProducer(String srcTopicName,
-                         int srcTopicPartition) throws PulsarClientException;
+    Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) 
throws PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcTopicName</tt> and 
<tt>srcTopicPartition</tt>
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index f98ac44..a1ad289 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -45,7 +45,7 @@ public class FunctionResultRouterTest {
 
     @Test
     public void testChoosePartitionWithoutKeyWithoutSequenceId() {
-        Message msg = mock(Message.class);
+        Message<?> msg = mock(Message.class);
         when(msg.hasKey()).thenReturn(false);
         when(msg.getKey()).thenReturn(null);
         when(msg.getSequenceId()).thenReturn(-1L);
@@ -70,7 +70,7 @@ public class FunctionResultRouterTest {
 
         FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
-            Message msg = mock(Message.class);
+            Message<?> msg = mock(Message.class);
             when(msg.hasKey()).thenReturn(false);
             when(msg.getKey()).thenReturn(null);
             when(msg.getSequenceId()).thenReturn((long) (2 * i));
@@ -82,11 +82,11 @@ public class FunctionResultRouterTest {
     public void testChoosePartitionWithKeyWithoutSequenceId() {
         String key1 = "key1";
         String key2 = "key2";
-        Message msg1 = mock(Message.class);
+        Message<?> msg1 = mock(Message.class);
         when(msg1.hasKey()).thenReturn(true);
         when(msg1.getKey()).thenReturn(key1);
         when(msg1.getSequenceId()).thenReturn(-1L);
-        Message msg2 = mock(Message.class);
+        Message<?> msg2 = mock(Message.class);
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn(-1L);
@@ -105,12 +105,12 @@ public class FunctionResultRouterTest {
     public void testChoosePartitionWithKeySequenceId() {
         String key1 = "key1";
         String key2 = "key2";
-        Message msg1 = mock(Message.class);
+        Message<?> msg1 = mock(Message.class);
         when(msg1.hasKey()).thenReturn(true);
         when(msg1.getKey()).thenReturn(key1);
         // make sure sequence id is different from hashcode, so the test can 
be tested correctly.
         when(msg1.getSequenceId()).thenReturn((long) ((key1.hashCode() % 100) 
+ 1));
-        Message msg2 = mock(Message.class);
+        Message<?> msg2 = mock(Message.class);
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100) 
+ 1));
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 5b463b6..a11062f 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -1009,4 +1009,4 @@
 //            assertTrue(mockProducers.isEmpty());
 //        }
 //    }
-//}
\ No newline at end of file
+//}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 77de78b..b9e762a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -55,5 +55,6 @@ public class JavaInstanceTest {
         JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
         assertNotNull(result.getResult());
         assertEquals(new String(testString + "-lambda"), result.getResult());
+        instance.close();
     }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 577015a..e6072c8 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,9 +19,6 @@
 package org.apache.pulsar.functions.instance.producers;
 
 import static 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -33,10 +30,19 @@ import static org.testng.Assert.assertTrue;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -48,34 +54,152 @@ public class MultiConsumersOneOutputTopicProducersTest {
     private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
-    private final Map<String, Producer> mockProducers = new HashMap<>();
+    private final Map<String, Producer<byte[]>> mockProducers = new 
HashMap<>();
     private MultiConsumersOneOuputTopicProducers producers;
 
+    private class MockProducerBuilder implements ProducerBuilder<byte[]> {
+
+        String producerName = "";
+
+        @Override
+        public Producer<byte[]> create() throws PulsarClientException {
+            Producer<byte[]> producer;
+            synchronized (mockProducers) {
+                producer = mockProducers.get(producerName);
+                if (null == producer) {
+                    producer = createMockProducer(producerName);
+                    mockProducers.put(producerName, producer);
+                }
+            }
+            return producer;
+        }
+
+        @Override
+        public CompletableFuture<Producer<byte[]>> createAsync() {
+            try {
+                return CompletableFuture.completedFuture(create());
+            } catch (PulsarClientException e) {
+                CompletableFuture<Producer<byte[]>> future = new 
CompletableFuture<>();
+                future.completeExceptionally(e);
+                return future;
+            }
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> clone() {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> topic(String topicName) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> producerName(String producerName) {
+            this.producerName = producerName;
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> sendTimeout(int sendTimeout, TimeUnit 
unit) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> maxPendingMessages(int 
maxPendingMessages) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> maxPendingMessagesAcrossPartitions(int 
maxPendingMessagesAcrossPartitions) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> blockIfQueueFull(boolean 
blockIfQueueFull) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> messageRoutingMode(MessageRoutingMode 
messageRoutingMode) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> hashingScheme(HashingScheme 
hashingScheme) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> compressionType(CompressionType 
compressionType) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> messageRouter(MessageRouter 
messageRouter) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> enableBatching(boolean enableBatching) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> cryptoKeyReader(CryptoKeyReader 
cryptoKeyReader) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> addEncryptionKey(String key) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> 
cryptoFailureAction(ProducerCryptoFailureAction action) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> batchingMaxPublishDelay(long 
batchDelay, TimeUnit timeUnit) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> batchingMaxMessages(int 
batchMessagesMaxMessagesPerBatch) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> initialSequenceId(long 
initialSequenceId) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> property(String key, String value) {
+            return this;
+        }
+
+        @Override
+        public ProducerBuilder<byte[]> properties(Map<String, String> 
properties) {
+            return this;
+        }
+    }
+
     @BeforeMethod
     public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
-        when(mockClient.createProducer(anyString(), 
any(ProducerConfiguration.class)))
-            .thenAnswer(invocationOnMock -> {
-                ProducerConfiguration conf = invocationOnMock.getArgumentAt(1, 
ProducerConfiguration.class);
-                String producerName = conf.getProducerName();
-
-                synchronized (mockProducers) {
-                    Producer producer = mockProducers.get(producerName);
-                    if (null == producer) {
-                        producer = createMockProducer(producerName);
-                        mockProducers.put(producerName, producer);
-                    }
-                    return producer;
-                }
-            });
+        when(mockClient.newProducer())
+            .thenReturn(new MockProducerBuilder());
 
         producers = new MultiConsumersOneOuputTopicProducers(mockClient, 
TEST_OUTPUT_TOPIC);
         producers.initialize();
     }
 
-    private Producer createMockProducer(String topic) {
-        Producer producer = mock(Producer.class);
+    private Producer<byte[]> createMockProducer(String topic) {
+        Producer<byte[]> producer = mock(Producer.class);
         when(producer.closeAsync())
             .thenAnswer(invocationOnMock -> {
                 synchronized (mockProducers) {
@@ -90,16 +214,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
     public void testGetCloseProducer() throws Exception {
         String srcTopic = "test-src-topic";
         int ptnIdx = 1234;
-        Producer producer = producers.getProducer(srcTopic, ptnIdx);
+        Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
 
         String producerName = makeProducerName(srcTopic, ptnIdx);
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .createProducer(
-                eq(TEST_OUTPUT_TOPIC),
-                any(ProducerConfiguration.class)
-            );
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(srcTopic));
         assertEquals(1, producers.getProducers().get(srcTopic).size());
         assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
@@ -107,10 +228,7 @@ public class MultiConsumersOneOutputTopicProducersTest {
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .createProducer(
-                eq(TEST_OUTPUT_TOPIC),
-                any(ProducerConfiguration.class)
-            );
+            .newProducer();
         assertTrue(producers.getProducers().containsKey(srcTopic));
         assertEquals(1, producers.getProducers().get(srcTopic).size());
         assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 13d8e62..df89eed 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,8 +20,9 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientConfiguration;
+
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -46,7 +47,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
             throws Exception {
         this(
             threadGroupName,
-            pulsarServiceUrl != null ? PulsarClient.create(pulsarServiceUrl, 
new ClientConfiguration()) : null,
+            pulsarServiceUrl != null ? 
PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null,
             storageServiceUrl);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 0a4cdeb..1c045d4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -29,13 +29,13 @@ import java.util.function.Function;
 
 @Slf4j
 public class FunctionAssignmentTailer
-    implements java.util.function.Consumer<Message>, Function<Throwable, 
Void>, AutoCloseable {
+    implements java.util.function.Consumer<Message<byte[]>>, 
Function<Throwable, Void>, AutoCloseable {
 
         private final FunctionRuntimeManager functionRuntimeManager;
-        private final Reader reader;
+        private final Reader<byte[]> reader;
 
     public FunctionAssignmentTailer(FunctionRuntimeManager 
functionRuntimeManager,
-                Reader reader)
+                Reader<byte[]> reader)
             throws PulsarClientException {
             this.functionRuntimeManager = functionRuntimeManager;
             this.reader = reader;
@@ -64,7 +64,7 @@ public class FunctionAssignmentTailer
     }
 
     @Override
-    public void accept(Message msg) {
+    public void accept(Message<byte[]> msg) {
 
         // check if latest
         boolean hasMessageAvailable;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index b5f9751..29a3220 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -88,7 +88,7 @@ public class FunctionMetaDataManager implements AutoCloseable 
{
         log.info("/** Initializing Function Metadata Manager **/");
         try {
 
-            Reader reader = pulsarClient.newReader()
+            Reader<byte[]> reader = pulsarClient.newReader()
                     .topic(this.workerConfig.getFunctionMetadataTopic())
                     .startMessageId(MessageId.earliest)
                     .create();
@@ -410,6 +410,6 @@ public class FunctionMetaDataManager implements 
AutoCloseable {
     }
 
     private ServiceRequestManager getServiceRequestManager(PulsarClient 
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
-        return new 
ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
+        return new 
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index a373158..4bcaaf7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,13 +29,13 @@ import 
org.apache.pulsar.functions.proto.Request.ServiceRequest;
 
 @Slf4j
 public class FunctionMetaDataTopicTailer
-        implements java.util.function.Consumer<Message>, Function<Throwable, 
Void>, AutoCloseable {
+        implements java.util.function.Consumer<Message<byte[]>>, 
Function<Throwable, Void>, AutoCloseable {
 
     private final FunctionMetaDataManager functionMetaDataManager;
-    private final Reader reader;
+    private final Reader<byte[]> reader;
 
     public FunctionMetaDataTopicTailer(FunctionMetaDataManager 
functionMetaDataManager,
-                                       Reader reader)
+                                       Reader<byte[]> reader)
             throws PulsarClientException {
         this.functionMetaDataManager = functionMetaDataManager;
         this.reader = reader;
@@ -62,7 +62,7 @@ public class FunctionMetaDataTopicTailer
         log.info("Stopped function state consumer");
     }
 
-    public void processRequest(Message msg) {
+    public void processRequest(Message<byte[]> msg) {
         ServiceRequest serviceRequest;
 
         try {
@@ -80,7 +80,7 @@ public class FunctionMetaDataTopicTailer
     }
 
     @Override
-    public void accept(Message msg) {
+    public void accept(Message<byte[]> msg) {
 
         processRequest(msg);
         // receive next request
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7e793a..af23992 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -85,7 +85,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
                                   MembershipManager membershipManager) throws 
Exception {
         this.workerConfig = workerConfig;
 
-        Reader reader = pulsarClient.newReader()
+        Reader<byte[]> reader = pulsarClient.newReader()
                 .topic(this.workerConfig.getFunctionAssignmentTopic())
                 .startMessageId(MessageId.earliest)
                 .create();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index 21dd69f..2a3dc0f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -22,12 +22,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * A starter to start function worker.
  */
-@Slf4j
 public class FunctionWorkerStarter {
 
     private static class WorkerArguments {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 4e5fb21..f43311d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,16 +32,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,7 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 public class MembershipManager implements AutoCloseable, ConsumerEventListener 
{
 
     private final String consumerName;
-    private final Consumer consumer;
+    private final Consumer<byte[]> consumer;
     private final WorkerConfig workerConfig;
     private PulsarAdmin pulsarAdminClient;
     private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -85,18 +86,17 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
         // we don't produce any messages into this topic, we only use the 
`failover` subscription
         // to elect an active consumer as the leader worker. The leader worker 
will be responsible
         // for scheduling snapshots for FMT and doing task assignment.
-        consumer = client.subscribe(
-                workerConfig.getClusterCoordinationTopic(),
-                COORDINATION_TOPIC_SUBSCRIPTION,
-                new ConsumerConfiguration()
-                        .setSubscriptionType(SubscriptionType.Failover)
-                        .setConsumerEventListener(this)
-                        .setProperty(WORKER_IDENTIFIER, consumerName)
-        );
+        consumer = client.newConsumer()
+                .topic(workerConfig.getClusterCoordinationTopic())
+                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+                .subscriptionType(SubscriptionType.Failover)
+                .consumerEventListener(this)
+                .property(WORKER_IDENTIFIER, consumerName)
+                .subscribe();
     }
 
     @Override
-    public void becameActive(Consumer consumer, int partitionId) {
+    public void becameActive(Consumer<?> consumer, int partitionId) {
         firstConsumerEventFuture.complete(null);
         if (isLeader.compareAndSet(false, true)) {
             log.info("Worker {} became the leader.", consumerName);
@@ -104,7 +104,7 @@ public class MembershipManager implements AutoCloseable, 
ConsumerEventListener {
     }
 
     @Override
-    public void becameInactive(Consumer consumer, int partitionId) {
+    public void becameInactive(Consumer<?> consumer, int partitionId) {
         firstConsumerEventFuture.complete(null);
         if (isLeader.compareAndSet(true, false)) {
             log.info("Worker {} lost the leadership.", consumerName);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c6b72a5..ed00958 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,21 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -47,6 +32,21 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.scheduler.IScheduler;
+
 @Slf4j
 public class SchedulerManager implements AutoCloseable {
 
@@ -63,7 +63,7 @@ public class SchedulerManager implements AutoCloseable {
 
     private final IScheduler scheduler;
 
-    private final Producer producer;
+    private final Producer<byte[]> producer;
 
     private final ExecutorService executorService;
 
@@ -72,14 +72,10 @@ public class SchedulerManager implements AutoCloseable {
         this.scheduler = 
Reflections.createInstance(workerConfig.getSchedulerClassName(), 
IScheduler.class,
                 Thread.currentThread().getContextClassLoader());
 
-        ProducerConfiguration producerConf = new ProducerConfiguration()
-            .setBatchingEnabled(true)
-            .setBlockIfQueueFull(true)
-            .setCompressionType(CompressionType.LZ4)
-            // retry until succeed
-            .setSendTimeout(0, TimeUnit.MILLISECONDS);
         try {
-            this.producer = 
pulsarClient.createProducer(this.workerConfig.getFunctionAssignmentTopic(), 
producerConf);
+            this.producer = 
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
+                    
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+                    sendTimeout(0, TimeUnit.MILLISECONDS).create();
         } catch (PulsarClientException e) {
             log.error("Failed to create producer to function assignment topic "
                     + this.workerConfig.getFunctionAssignmentTopic(), e);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 5a9f339..071e946 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,8 +18,18 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import dlshade.org.apache.zookeeper.KeeperException.Code;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,17 +43,7 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.UUID;
+import dlshade.org.apache.zookeeper.KeeperException.Code;
 
 @Slf4j
 public final class Utils {
@@ -129,13 +129,14 @@ public final class Utils {
                                                  OutputStream outputStream,
                                                  String packagePath) throws 
IOException {
         DistributedLogManager dlm = namespace.openLog(packagePath);
-        InputStream in = new DLInputStream(dlm);
-        int read = 0;
-        byte[] bytes = new byte[1024];
-        while ((read = in.read(bytes)) != -1) {
-            outputStream.write(bytes, 0, read);
+        try (InputStream in = new DLInputStream(dlm)) {
+            int read = 0;
+            byte[] bytes = new byte[1024];
+            while ((read = in.read(bytes)) != -1) {
+                outputStream.write(bytes, 0, read);
+            }
+            outputStream.flush();
         }
-        outputStream.flush();
     }
 
     public static DistributedLogConfiguration getDlogConf(WorkerConfig 
workerConfig) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index fb875b5..06f4e53 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,12 +20,16 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
 
-import lombok.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import lombok.experimental.Accessors;
 
 @Data
@@ -92,7 +96,7 @@ public class WorkerConfig implements Serializable {
     public String getFunctionAssignmentTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, 
functionAssignmentTopicName);
     }
-    
+
     public static WorkerConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), WorkerConfig.class);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5df3c3f..e33dcba 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,7 +76,7 @@ public class WorkerService {
         // initialize the function metadata manager
         try {
 
-            this.client = 
PulsarClient.create(this.workerConfig.getPulsarServiceUrl());
+            this.client = 
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build();
             log.info("Created Pulsar client");
 
             //create scheduler manager
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 3d27fb4..ac99f9a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -19,17 +19,21 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
@@ -42,12 +46,24 @@ import org.testng.annotations.Test;
 @Slf4j
 public class FunctionMetaDataManagerTest {
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+
+        when(builder.create()).thenReturn(mock(Producer.class));
+
+        PulsarClient client = mock(PulsarClient.class);
+        when(client.newProducer()).thenReturn(builder);
+
+        return client;
+    }
+
     @Test
     public void testListFunctions() throws PulsarClientException {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(new WorkerConfig(),
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new 
HashMap<>();
         functionMetaDataMap1.put("func-1", 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -87,7 +103,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
 
@@ -123,7 +139,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Map<String, Function.FunctionMetaData> functionMetaDataMap = new 
HashMap<>();
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -167,7 +183,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
                         
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -209,7 +225,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         mock(SchedulerManager.class),
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         
Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
         
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -247,7 +263,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         // worker has no record of function
         Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -275,7 +291,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -324,7 +340,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
 
         Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -361,7 +377,7 @@ public class FunctionMetaDataManagerTest {
         FunctionMetaDataManager functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         // worker has no record of function
         Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -388,7 +404,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         functionMetaDataManager.setFunctionMetaData(test);
         Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -415,7 +431,7 @@ public class FunctionMetaDataManagerTest {
         functionMetaDataManager = spy(
                 new FunctionMetaDataManager(workerConfig,
                         schedulerManager,
-                        mock(PulsarClient.class)));
+                        mockPulsarClient()));
         functionMetaDataManager.setFunctionMetaData(test);
 
         Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14b56ac..6dd3fa3 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,24 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
@@ -49,6 +31,28 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.proto.Function;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 public class MembershipManagerTest {
 
     private final WorkerConfig workerConfig;
@@ -63,29 +67,33 @@ public class MembershipManagerTest {
 
     @Test
     public void testConsumerEventListener() throws Exception {
-        PulsarClient mockClient = mock(PulsarClient.class);
-        Consumer mockConsumer = mock(Consumer.class);
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
+
+        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
 
         AtomicReference<ConsumerEventListener> listenerHolder = new 
AtomicReference<>();
-        when(mockClient.subscribe(
-            eq(workerConfig.getClusterCoordinationTopic()),
-            eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
-            any(ConsumerConfiguration.class)
-        )).thenAnswer(invocationOnMock -> {
+        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
 -> {
 
-            ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2, 
ConsumerConfiguration.class);
-            listenerHolder.set(conf.getConsumerEventListener());
+            ConsumerEventListener listener = invocationOnMock.getArgumentAt(0, 
ConsumerEventListener.class);
+            listenerHolder.set(listener);
 
-            return mockConsumer;
+            return mockConsumerBuilder;
         });
 
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
         MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockClient));
         assertFalse(membershipManager.isLeader());
         verify(mockClient, times(1))
-            .subscribe(
-                eq(workerConfig.getClusterCoordinationTopic()),
-                eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
-                any(ConsumerConfiguration.class));
+            .newConsumer();
 
         listenerHolder.get().becameActive(mockConsumer, 0);
         assertTrue(membershipManager.isLeader());
@@ -94,11 +102,31 @@ public class MembershipManagerTest {
         assertFalse(membershipManager.isLeader());
     }
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+        Consumer<byte[]> mockConsumer = mock(Consumer.class);
+        ConsumerBuilder<byte[]> mockConsumerBuilder = 
mock(ConsumerBuilder.class);
+
+        
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+        
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+        when(mockConsumerBuilder.property(anyString(), 
anyString())).thenReturn(mockConsumerBuilder);
+
+        when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+        
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+        when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+        return mockClient;
+    }
+
     @Test
     public void testCheckFailuresNoFailures() throws Exception {
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -110,7 +138,7 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
@@ -162,7 +190,7 @@ public class MembershipManagerTest {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -175,7 +203,7 @@ public class MembershipManagerTest {
         ));
 
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
@@ -251,7 +279,7 @@ public class MembershipManagerTest {
         workerConfig.setRescheduleTimeoutMs(30000);
         SchedulerManager schedulerManager = mock(SchedulerManager.class);
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+        ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
         doReturn(readerBuilder).when(pulsarClient).newReader();
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -263,7 +291,7 @@ public class MembershipManagerTest {
                 mock(MembershipManager.class)
         ));
         FunctionMetaDataManager functionMetaDataManager = 
mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mock(PulsarClient.class)));
+        MembershipManager membershipManager = spy(new 
MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
         workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", 
"host-1", 8000));
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 64d6931..b5b4e98 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.functions.worker;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +46,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -61,6 +66,18 @@ public class SchedulerManagerTest {
     private CompletableFuture<MessageId> completableFuture;
     private Producer producer;
 
+    private static PulsarClient mockPulsarClient() throws 
PulsarClientException {
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+
+        when(builder.create()).thenReturn(mock(Producer.class));
+
+        PulsarClient client = mock(PulsarClient.class);
+        when(client.newProducer()).thenReturn(builder);
+
+        return client;
+    }
+
     @BeforeMethod
     public void setup() throws PulsarClientException {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -78,8 +95,17 @@ public class SchedulerManagerTest {
         byte[] bytes = any();
         when(producer.sendAsync(bytes)).thenReturn(completableFuture);
 
+        ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+        when(builder.topic(anyString())).thenReturn(builder);
+        when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+        when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+        
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+        when(builder.sendTimeout(anyInt(), 
any(TimeUnit.class))).thenReturn(builder);
+
+        when(builder.create()).thenReturn(producer);
+
         PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
+        when(pulsarClient.newProducer()).thenReturn(builder);
 
         schedulerManager = spy(new SchedulerManager(workerConfig, 
pulsarClient));
         functionRuntimeManager = mock(FunctionRuntimeManager.class);
@@ -380,9 +406,6 @@ public class SchedulerManagerTest {
 
         // scale up
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
                         
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version)
@@ -500,9 +523,6 @@ public class SchedulerManagerTest {
 
         // scale down
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Scaled = 
Function.FunctionMetaData.newBuilder()
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
                         
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
@@ -612,9 +632,6 @@ public class SchedulerManagerTest {
 
         // scale down
 
-        PulsarClient pulsarClient = mock(PulsarClient.class);
-        doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
         Function.FunctionMetaData function2Updated = 
Function.FunctionMetaData.newBuilder()
                 
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
                 
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 406a2c8..cb21f6f 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -23,6 +23,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Enums;
 import com.google.common.base.Splitter;
+
 import java.io.IOException;
 import java.util.Base64;
 import java.util.List;
@@ -30,15 +31,19 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -63,10 +68,10 @@ import org.slf4j.LoggerFactory;
 public class ConsumerHandler extends AbstractWebSocketHandler {
 
     private String subscription = null;
-    private final ConsumerConfiguration conf;
+    private SubscriptionType subscriptionType;
     private Consumer<byte[]> consumer;
 
-    private final int maxPendingMessages;
+    private int maxPendingMessages;
     private final AtomicInteger pendingMessages = new AtomicInteger();
 
     private final LongAdder numMsgsDelivered;
@@ -78,20 +83,26 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
     public ConsumerHandler(WebSocketService service, HttpServletRequest 
request, ServletUpgradeResponse response) {
         super(service, request, response);
-        this.conf = getConsumerConfiguration();
-        this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : 
conf.getReceiverQueueSize();
+
+        ConsumerBuilderImpl<byte[]> builder;
+
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
 
         try {
+            builder = (ConsumerBuilderImpl<byte[]>) 
getConsumerConfiguration(service.getPulsarClient());
+            this.maxPendingMessages = 
(builder.getConf().getReceiverQueueSize() == 0) ? 1
+                    : builder.getConf().getReceiverQueueSize();
+            this.subscriptionType = builder.getConf().getSubscriptionType();
+
             // checkAuth() should be called after assigning a value to 
this.subscription
             this.subscription = extractSubscription(request);
             if (!checkAuth(response)) {
                 return;
             }
 
-            this.consumer = 
service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
+            this.consumer = 
builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
             if (!this.service.addConsumer(this)) {
                 log.warn("[{}:{}] Failed to add consumer handler for topic 
{}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -242,7 +253,7 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
         }
     }
 
-    public Consumer getConsumer() {
+    public Consumer<byte[]> getConsumer() {
         return this.consumer;
     }
 
@@ -251,7 +262,7 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
     }
 
     public SubscriptionType getSubscriptionType() {
-        return conf.getSubscriptionType();
+        return subscriptionType;
     }
 
     public long getAndResetNumMsgsDelivered() {
@@ -267,7 +278,7 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
     }
 
     public long getMsgDeliveredCounter() {
-        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+        return msgDeliveredCounter;
     }
 
     protected void updateDeliverMsgStat(long msgSize) {
@@ -276,32 +287,32 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
         numBytesDelivered.add(msgSize);
     }
 
-    private ConsumerConfiguration getConsumerConfiguration() {
-        ConsumerConfiguration conf = new ConsumerConfiguration();
+    private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient 
client) {
+        ConsumerBuilder<byte[]> builder = client.newConsumer();
 
         if (queryParams.containsKey("ackTimeoutMillis")) {
-            
conf.setAckTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")), 
TimeUnit.MILLISECONDS);
+            
builder.ackTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
         if (queryParams.containsKey("subscriptionType")) {
             checkArgument(Enums.getIfPresent(SubscriptionType.class, 
queryParams.get("subscriptionType")).isPresent(),
                     "Invalid subscriptionType %s", 
queryParams.get("subscriptionType"));
-            
conf.setSubscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
+            
builder.subscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
         }
 
         if (queryParams.containsKey("receiverQueueSize")) {
-            
conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
 1000));
+            
builder.receiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
 1000));
         }
 
         if (queryParams.containsKey("consumerName")) {
-            conf.setConsumerName(queryParams.get("consumerName"));
+            builder.consumerName(queryParams.get("consumerName"));
         }
 
         if (queryParams.containsKey("priorityLevel")) {
-            
conf.setPriorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
+            
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
         }
 
-        return conf;
+        return builder;
     }
 
     @Override
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ed8adf..80545bf 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -24,6 +24,9 @@ import static 
org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFrom
 import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
 import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Enums;
+
 import java.io.IOException;
 import java.util.Base64;
 import java.util.concurrent.TimeUnit;
@@ -35,12 +38,13 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
@@ -52,9 +56,6 @@ import 
org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Enums;
-
 
 /**
  * Websocket end-point url handler to handle incoming message coming from 
client. Websocket end-point url handler to
@@ -91,8 +92,7 @@ public class ProducerHandler extends AbstractWebSocketHandler 
{
         }
 
         try {
-            ProducerConfiguration conf = getProducerConfiguration();
-            this.producer = 
service.getPulsarClient().createProducer(topic.toString(), conf);
+            this.producer = 
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
             if (!this.service.addProducer(this)) {
                 log.warn("[{}:{}] Failed to add producer handler for topic 
{}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -166,7 +166,7 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
         }
 
         final long msgSize = rawPayload.length;
-        MessageBuilder builder = 
MessageBuilder.create().setContent(rawPayload);
+        MessageBuilder<byte[]> builder = 
MessageBuilder.create().setContent(rawPayload);
 
         if (sendRequest.properties != null) {
             builder.setProperties(sendRequest.properties);
@@ -196,7 +196,7 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
         });
     }
 
-    public Producer getProducer() {
+    public Producer<byte[]> getProducer() {
         return this.producer;
     }
 
@@ -222,7 +222,7 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
     }
 
     public long getMsgPublishedCounter() {
-        return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
+        return msgPublishedCounter;
     }
 
     @Override
@@ -248,45 +248,44 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
         MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
     }
 
-    private ProducerConfiguration getProducerConfiguration() {
-        ProducerConfiguration conf = new ProducerConfiguration();
-
-        conf.setBatchingEnabled(false);
-        conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+    private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+        ProducerBuilder<byte[]> builder = client.newProducer()
+            .enableBatching(false)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition);
 
         // Set to false to prevent the server thread from being blocked if a 
lot of messages are pending.
-        conf.setBlockIfQueueFull(false);
+        builder.blockIfQueueFull(false);
 
         if (queryParams.containsKey("producerName")) {
-            conf.setProducerName(queryParams.get("producerName"));
+            builder.producerName(queryParams.get("producerName"));
         }
 
         if (queryParams.containsKey("initialSequenceId")) {
-            conf.setInitialSequenceId(Long.parseLong("initialSequenceId"));
+            builder.initialSequenceId(Long.parseLong("initialSequenceId"));
         }
 
         if (queryParams.containsKey("hashingScheme")) {
-            
conf.setHashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
+            
builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
         }
 
         if (queryParams.containsKey("sendTimeoutMillis")) {
-            
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
+            
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
         if (queryParams.containsKey("batchingEnabled")) {
-            
conf.setBatchingEnabled(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
         }
 
         if (queryParams.containsKey("batchingMaxMessages")) {
-            
conf.setBatchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+            
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
         }
 
         if (queryParams.containsKey("maxPendingMessages")) {
-            
conf.setMaxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+            
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
         }
 
         if (queryParams.containsKey("batchingMaxPublishDelay")) {
-            
conf.setBatchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+            
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
                     TimeUnit.MILLISECONDS);
         }
 
@@ -296,17 +295,17 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
                     "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
             MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
             if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
-                conf.setMessageRoutingMode(routingMode);
+                builder.messageRoutingMode(routingMode);
             }
         }
 
         if (queryParams.containsKey("compressionType")) {
             checkArgument(Enums.getIfPresent(CompressionType.class, 
queryParams.get("compressionType")).isPresent(),
                     "Invalid compressionType %s", 
queryParams.get("compressionType"));
-            
conf.setCompressionType(CompressionType.valueOf(queryParams.get("compressionType")));
+            
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
         }
 
-        return conf;
+        return builder;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerHandler.class);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54037fa..82d7cff 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -94,7 +94,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
 
             this.reader = builder.create();
 
-            this.subscription = 
((ReaderImpl)this.reader).getConsumer().getSubscription();
+            this.subscription = ((ReaderImpl<?>) 
this.reader).getConsumer().getSubscription();
             if (!this.service.addReader(this)) {
                 log.warn("[{}:{}] Failed to add reader handler for topic {}", 
request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -214,8 +214,8 @@ public class ReaderHandler extends AbstractWebSocketHandler 
{
         }
     }
 
-    public Consumer getConsumer() {
-        return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
+    public Consumer<?> getConsumer() {
+        return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
     }
 
     public String getSubscription() {
@@ -235,7 +235,7 @@ public class ReaderHandler extends AbstractWebSocketHandler 
{
     }
 
     public long getMsgDeliveredCounter() {
-        return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+        return msgDeliveredCounter;
     }
 
     protected void updateDeliverMsgStat(long msgSize) {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to