This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 064ba5a Modified functions producer consumers to use new builder
based api (#1580)
064ba5a is described below
commit 064ba5aa9ae8a3032d9993d1b145eb15c852e11f
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 25 23:36:30 2018 -0700
Modified functions producer consumers to use new builder based api (#1580)
* Modified functions producer consumers to use new builder based api
* remove `@Test` on MultiConsumersOneOutputTopicProducersTest, since the
client creation is changed to builder and this code might eventually go aways
with connect related changes.
* Fix MultiConsumersOneOutputTopicProducersTest
* Fix FunctionMetaDataManagerTest
* Fix MembershipManagerTest
* Fix SchedulerManagerTest.java
---
.../apache/pulsar/compaction/CompactorTool.java | 27 ++--
.../pulsar/compaction/TwoPhaseCompactor.java | 13 +-
.../pulsar/client/admin/internal/ClustersImpl.java | 9 +-
.../admin/internal/PersistentTopicsImpl.java | 3 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 4 +
.../instance/processors/AtLeastOnceProcessor.java | 6 +-
.../instance/processors/AtMostOnceProcessor.java | 6 +-
.../processors/EffectivelyOnceProcessor.java | 11 +-
.../instance/processors/MessageProcessor.java | 4 +-
.../instance/processors/MessageProcessorBase.java | 4 +-
.../producers/AbstractOneOuputTopicProducers.java | 47 +++---
.../MultiConsumersOneOuputTopicProducers.java | 16 +-
.../functions/instance/producers/Producers.java | 3 +-
.../instance/FunctionResultRouterTest.java | 12 +-
.../instance/JavaInstanceRunnableProcessTest.java | 2 +-
.../functions/instance/JavaInstanceTest.java | 1 +
.../MultiConsumersOneOutputTopicProducersTest.java | 178 +++++++++++++++++----
.../functions/runtime/ThreadRuntimeFactory.java | 5 +-
.../functions/worker/FunctionAssignmentTailer.java | 8 +-
.../functions/worker/FunctionMetaDataManager.java | 4 +-
.../worker/FunctionMetaDataTopicTailer.java | 10 +-
.../functions/worker/FunctionRuntimeManager.java | 2 +-
.../functions/worker/FunctionWorkerStarter.java | 2 -
.../pulsar/functions/worker/MembershipManager.java | 26 +--
.../pulsar/functions/worker/SchedulerManager.java | 42 +++--
.../org/apache/pulsar/functions/worker/Utils.java | 37 ++---
.../pulsar/functions/worker/WorkerConfig.java | 10 +-
.../pulsar/functions/worker/WorkerService.java | 2 +-
.../worker/FunctionMetaDataManagerTest.java | 38 +++--
.../functions/worker/MembershipManagerTest.java | 104 +++++++-----
.../functions/worker/SchedulerManagerTest.java | 37 +++--
.../apache/pulsar/websocket/ConsumerHandler.java | 47 +++---
.../apache/pulsar/websocket/ProducerHandler.java | 55 ++++---
.../org/apache/pulsar/websocket/ReaderHandler.java | 8 +-
34 files changed, 485 insertions(+), 298 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 0736146..c16d676 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.compaction;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +35,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -40,12 +44,6 @@ import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import lombok.Cleanup;
-
public class CompactorTool {
private static class Arguments {
@@ -82,15 +80,16 @@ public class CompactorTool {
}
String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
- ClientConfiguration clientConfig = new ClientConfiguration();
+ ClientBuilder clientBuilder = PulsarClient.builder();
if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-
clientConfig.setAuthentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
-
brokerConfig.getBrokerClientAuthenticationParameters());
+
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+ brokerConfig.getBrokerClientAuthenticationParameters());
}
- clientConfig.setUseTls(brokerConfig.isTlsEnabled());
-
clientConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
-
clientConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+ clientBuilder.serviceUrl(pulsarServiceUrl)
+ .enableTls(brokerConfig.isTlsEnabled())
+
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
+
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -103,7 +102,7 @@ public class CompactorTool {
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new
BookKeeperClientFactoryImpl();
BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
- try (PulsarClient pulsar = PulsarClient.create(pulsarServiceUrl,
clientConfig)) {
+ try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar,
bk, scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
log.info("Compaction of topic {} complete. Compacted to ledger
{}", arguments.topic, ledgerId);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3213fd9..5db4f2e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,36 +21,33 @@ package org.apache.pulsar.compaction;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.ImmutableMap;
+
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.RawBatchConverter;
-
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 32cb5c0..e3041ba 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -93,10 +93,11 @@ public class ClustersImpl extends BaseResource implements
Clusters {
} catch (Exception e) {
throw getApiException(e);
}
-
+
}
@Override
+ @SuppressWarnings("unchecked")
public Set<String> getPeerClusterNames(String cluster) throws
PulsarAdminException {
try {
return
request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
@@ -104,7 +105,7 @@ public class ClustersImpl extends BaseResource implements
Clusters {
throw getApiException(e);
}
}
-
+
@Override
public void deleteCluster(String cluster) throws PulsarAdminException {
try {
@@ -125,7 +126,7 @@ public class ClustersImpl extends BaseResource implements
Clusters {
}
}
-
+
@Override
public List<BrokerNamespaceIsolationData>
getBrokersWithNamespaceIsolationPolicy(String cluster)
throws PulsarAdminException {
@@ -223,7 +224,7 @@ public class ClustersImpl extends BaseResource implements
Clusters {
throw getApiException(e);
}
}
-
+
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
try {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index 3121739..86f2c05 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -22,6 +22,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -796,7 +797,7 @@ public class PersistentTopicsImpl extends BaseResource
implements PersistentTopi
}
}
- return Lists.newArrayList(new MessageImpl<byte[]>(msgId,
properties, data, Schema.IDENTITY));
+ return Collections.singletonList(new MessageImpl<byte[]>(msgId,
properties, data, Schema.IDENTITY));
} finally {
if (stream != null) {
stream.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f024a4f..af11ef5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -236,4 +236,8 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
+
+ public ConsumerConfigurationData<T> getConf() {
+ return conf;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 819c118..86aeb83 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -38,7 +38,7 @@ import
org.apache.pulsar.functions.proto.Function.FunctionDetails;
public class AtLeastOnceProcessor extends MessageProcessorBase {
@Getter
- private Producer producer;
+ private Producer<byte[]> producer;
AtLeastOnceProcessor(PulsarClient client,
FunctionDetails functionDetails,
@@ -52,13 +52,13 @@ public class AtLeastOnceProcessor extends
MessageProcessorBase {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg, MessageBuilder
outputMsgBuilder) {
+ public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
if (null == outputMsgBuilder || null == producer) {
inputMsg.ack();
return;
}
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
.thenAccept(msgId -> inputMsg.ack());
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index b6871fa..994be0d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -36,7 +36,7 @@ import
org.apache.pulsar.functions.proto.Function.FunctionDetails;
@Slf4j
class AtMostOnceProcessor extends MessageProcessorBase {
- private Producer producer;
+ private Producer<byte[]> producer;
AtMostOnceProcessor(PulsarClient client,
FunctionDetails functionDetails,
@@ -58,12 +58,12 @@ class AtMostOnceProcessor extends MessageProcessorBase {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg, MessageBuilder
outputMsgBuilder) {
+ public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
if (null == outputMsgBuilder) {
return;
}
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg);
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index d31bb06..59c7bd6 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -22,11 +22,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
@@ -102,7 +104,7 @@ class EffectivelyOnceProcessor extends MessageProcessorBase
implements ConsumerE
@Override
public void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder outputMsgBuilder) throws
Exception {
+ MessageBuilder<byte[]> outputMsgBuilder)
throws Exception {
if (null == outputMsgBuilder) {
inputMsg.ackCumulative();
return;
@@ -113,15 +115,14 @@ class EffectivelyOnceProcessor extends
MessageProcessorBase implements ConsumerE
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
- Producer producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
+ Producer<byte[]> producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
.thenAccept(messageId -> inputMsg.ackCumulative())
.join();
}
-
@Override
public void close() {
super.close();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 33d4f77..97c971d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -82,7 +82,7 @@ public interface MessageProcessor extends AutoCloseable {
*
* @return the input consumer.
*/
- Consumer getInputConsumer();
+ Consumer<byte[]> getInputConsumer();
/**
* Setup the output with a provided <i>outputSerDe</i>. The implementation
of this processor is responsible for
@@ -103,7 +103,7 @@ public interface MessageProcessor extends AutoCloseable {
* @param outputMsgBuilder output message builder. it can be null.
*/
void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder outputMsgBuilder) throws
PulsarClientException, Exception;
+ MessageBuilder<byte[]> outputMsgBuilder) throws
PulsarClientException, Exception;
/**
* Get the next message to process
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index d525a1a..fc37b13 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -24,7 +24,9 @@ import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,7 +53,7 @@ abstract class MessageProcessorBase implements
MessageProcessor {
protected SerDe outputSerDe;
@Getter
- protected Consumer inputConsumer;
+ protected Consumer<byte[]> inputConsumer;
protected List<String> topics;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index bba9a7f..7a561a1 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.functions.instance.producers;
import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
@@ -32,51 +33,43 @@ public abstract class AbstractOneOuputTopicProducers
implements Producers {
protected final PulsarClient client;
protected final String outputTopic;
- protected final ProducerConfiguration conf;
AbstractOneOuputTopicProducers(PulsarClient client,
String outputTopic)
throws PulsarClientException {
this.client = client;
this.outputTopic = outputTopic;
- this.conf = newProducerConfiguration();
}
- static ProducerConfiguration newProducerConfiguration() {
- ProducerConfiguration conf = new ProducerConfiguration();
- conf.setBlockIfQueueFull(true);
- conf.setBatchingEnabled(true);
- conf.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
- conf.setMaxPendingMessages(1000000);
- conf.setCompressionType(CompressionType.LZ4);
- conf.setHashingScheme(HashingScheme.Murmur3_32Hash);
- conf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
// use function result router to deal with different processing
guarantees.
- conf.setMessageRouter(FunctionResultRouter.of());
- return conf;
+ return client.newProducer() //
+ .blockIfQueueFull(true) //
+ .enableBatching(true) //
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
+ .compressionType(CompressionType.LZ4) //
+ .hashingScheme(HashingScheme.Murmur3_32Hash) //
+ .messageRoutingMode(MessageRoutingMode.CustomPartition) //
+ .messageRouter(FunctionResultRouter.of());
}
- protected Producer createProducer(String topic)
+ protected Producer<byte[]> createProducer(String topic)
throws PulsarClientException {
return createProducer(client, topic);
}
- public static Producer createProducer(PulsarClient client, String topic)
+ public static Producer<byte[]> createProducer(PulsarClient client, String
topic)
throws PulsarClientException {
- return client.createProducer(topic, newProducerConfiguration());
+ return newProducerBuilder(client).topic(topic).create();
}
- protected Producer createProducer(String topic, String producerName)
+ protected Producer<byte[]> createProducer(String topic, String
producerName)
throws PulsarClientException {
return createProducer(client, topic, producerName);
}
- public static Producer createProducer(PulsarClient client, String topic,
String producerName)
- throws PulsarClientException {
- ProducerConfiguration newConf = newProducerConfiguration();
- newConf.setProducerName(producerName);
-
- return client.createProducer(topic, newConf);
+ public static Producer<byte[]> createProducer(PulsarClient client, String
topic, String producerName)
+ throws PulsarClientException {
+ return
newProducerBuilder(client).topic(topic).producerName(producerName).create();
}
-
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index cf21a04..3668311 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicProducers {
@Getter(AccessLevel.PACKAGE)
- private final Map<String, IntObjectMap<Producer>> producers;
+ private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
public MultiConsumersOneOuputTopicProducers(PulsarClient client,
String outputTopic)
@@ -56,14 +56,14 @@ public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicP
}
@Override
- public synchronized Producer getProducer(String srcTopicName, int
srcTopicPartition) throws PulsarClientException {
- IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+ public synchronized Producer<byte[]> getProducer(String srcTopicName, int
srcTopicPartition) throws PulsarClientException {
+ IntObjectMap<Producer<byte[]>> producerMap =
producers.get(srcTopicName);
if (null == producerMap) {
producerMap = new IntObjectHashMap<>();
producers.put(srcTopicName, producerMap);
}
- Producer producer = producerMap.get(srcTopicPartition);
+ Producer<byte[]> producer = producerMap.get(srcTopicPartition);
if (null == producer) {
producer = createProducer(outputTopic,
makeProducerName(srcTopicName, srcTopicPartition));
producerMap.put(srcTopicPartition, producer);
@@ -73,10 +73,10 @@ public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicP
@Override
public synchronized void closeProducer(String srcTopicName, int
srcTopicPartition) {
- IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+ IntObjectMap<Producer<byte[]>> producerMap =
producers.get(srcTopicName);
if (null != producerMap) {
- Producer producer = producerMap.remove(srcTopicPartition);
+ Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
if (null != producer) {
producer.closeAsync();
}
@@ -89,8 +89,8 @@ public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicP
@Override
public synchronized void close() {
List<CompletableFuture<Void>> closeFutures = new
ArrayList<>(producers.size());
- for (IntObjectMap<Producer> producerMap: producers.values()) {
- for (Producer producer : producerMap.values()) {
+ for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+ for (Producer<byte[]> producer : producerMap.values()) {
closeFutures.add(producer.closeAsync());
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 9669bea..29cd96a 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,8 +42,7 @@ public interface Producers extends AutoCloseable {
* src topic partition
* @return the producer instance to produce messages
*/
- Producer getProducer(String srcTopicName,
- int srcTopicPartition) throws PulsarClientException;
+ Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition)
throws PulsarClientException;
/**
* Close a producer specified by <tt>srcTopicName</tt> and
<tt>srcTopicPartition</tt>
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index f98ac44..a1ad289 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -45,7 +45,7 @@ public class FunctionResultRouterTest {
@Test
public void testChoosePartitionWithoutKeyWithoutSequenceId() {
- Message msg = mock(Message.class);
+ Message<?> msg = mock(Message.class);
when(msg.hasKey()).thenReturn(false);
when(msg.getKey()).thenReturn(null);
when(msg.getSequenceId()).thenReturn(-1L);
@@ -70,7 +70,7 @@ public class FunctionResultRouterTest {
FunctionResultRouter router = new FunctionResultRouter(0, clock);
for (int i = 0; i < 10; i++) {
- Message msg = mock(Message.class);
+ Message<?> msg = mock(Message.class);
when(msg.hasKey()).thenReturn(false);
when(msg.getKey()).thenReturn(null);
when(msg.getSequenceId()).thenReturn((long) (2 * i));
@@ -82,11 +82,11 @@ public class FunctionResultRouterTest {
public void testChoosePartitionWithKeyWithoutSequenceId() {
String key1 = "key1";
String key2 = "key2";
- Message msg1 = mock(Message.class);
+ Message<?> msg1 = mock(Message.class);
when(msg1.hasKey()).thenReturn(true);
when(msg1.getKey()).thenReturn(key1);
when(msg1.getSequenceId()).thenReturn(-1L);
- Message msg2 = mock(Message.class);
+ Message<?> msg2 = mock(Message.class);
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
when(msg1.getSequenceId()).thenReturn(-1L);
@@ -105,12 +105,12 @@ public class FunctionResultRouterTest {
public void testChoosePartitionWithKeySequenceId() {
String key1 = "key1";
String key2 = "key2";
- Message msg1 = mock(Message.class);
+ Message<?> msg1 = mock(Message.class);
when(msg1.hasKey()).thenReturn(true);
when(msg1.getKey()).thenReturn(key1);
// make sure sequence id is different from hashcode, so the test can
be tested correctly.
when(msg1.getSequenceId()).thenReturn((long) ((key1.hashCode() % 100)
+ 1));
- Message msg2 = mock(Message.class);
+ Message<?> msg2 = mock(Message.class);
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100)
+ 1));
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 5b463b6..a11062f 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -1009,4 +1009,4 @@
// assertTrue(mockProducers.isEmpty());
// }
// }
-//}
\ No newline at end of file
+//}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 77de78b..b9e762a 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -55,5 +55,6 @@ public class JavaInstanceTest {
JavaExecutionResult result =
instance.handleMessage(MessageId.earliest, "random", testString);
assertNotNull(result.getResult());
assertEquals(new String(testString + "-lambda"), result.getResult());
+ instance.close();
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 577015a..e6072c8 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,9 +19,6 @@
package org.apache.pulsar.functions.instance.producers;
import static
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -33,10 +30,19 @@ import static org.testng.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -48,34 +54,152 @@ public class MultiConsumersOneOutputTopicProducersTest {
private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
private PulsarClient mockClient;
- private final Map<String, Producer> mockProducers = new HashMap<>();
+ private final Map<String, Producer<byte[]>> mockProducers = new
HashMap<>();
private MultiConsumersOneOuputTopicProducers producers;
+ private class MockProducerBuilder implements ProducerBuilder<byte[]> {
+
+ String producerName = "";
+
+ @Override
+ public Producer<byte[]> create() throws PulsarClientException {
+ Producer<byte[]> producer;
+ synchronized (mockProducers) {
+ producer = mockProducers.get(producerName);
+ if (null == producer) {
+ producer = createMockProducer(producerName);
+ mockProducers.put(producerName, producer);
+ }
+ }
+ return producer;
+ }
+
+ @Override
+ public CompletableFuture<Producer<byte[]>> createAsync() {
+ try {
+ return CompletableFuture.completedFuture(create());
+ } catch (PulsarClientException e) {
+ CompletableFuture<Producer<byte[]>> future = new
CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> clone() {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> topic(String topicName) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> producerName(String producerName) {
+ this.producerName = producerName;
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> sendTimeout(int sendTimeout, TimeUnit
unit) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> maxPendingMessages(int
maxPendingMessages) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> maxPendingMessagesAcrossPartitions(int
maxPendingMessagesAcrossPartitions) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> blockIfQueueFull(boolean
blockIfQueueFull) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> messageRoutingMode(MessageRoutingMode
messageRoutingMode) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> hashingScheme(HashingScheme
hashingScheme) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> compressionType(CompressionType
compressionType) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> messageRouter(MessageRouter
messageRouter) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> enableBatching(boolean enableBatching) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> cryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> addEncryptionKey(String key) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]>
cryptoFailureAction(ProducerCryptoFailureAction action) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> batchingMaxPublishDelay(long
batchDelay, TimeUnit timeUnit) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> batchingMaxMessages(int
batchMessagesMaxMessagesPerBatch) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> initialSequenceId(long
initialSequenceId) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> property(String key, String value) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> properties(Map<String, String>
properties) {
+ return this;
+ }
+ }
+
@BeforeMethod
public void setup() throws Exception {
this.mockClient = mock(PulsarClient.class);
- when(mockClient.createProducer(anyString(),
any(ProducerConfiguration.class)))
- .thenAnswer(invocationOnMock -> {
- ProducerConfiguration conf = invocationOnMock.getArgumentAt(1,
ProducerConfiguration.class);
- String producerName = conf.getProducerName();
-
- synchronized (mockProducers) {
- Producer producer = mockProducers.get(producerName);
- if (null == producer) {
- producer = createMockProducer(producerName);
- mockProducers.put(producerName, producer);
- }
- return producer;
- }
- });
+ when(mockClient.newProducer())
+ .thenReturn(new MockProducerBuilder());
producers = new MultiConsumersOneOuputTopicProducers(mockClient,
TEST_OUTPUT_TOPIC);
producers.initialize();
}
- private Producer createMockProducer(String topic) {
- Producer producer = mock(Producer.class);
+ private Producer<byte[]> createMockProducer(String topic) {
+ Producer<byte[]> producer = mock(Producer.class);
when(producer.closeAsync())
.thenAnswer(invocationOnMock -> {
synchronized (mockProducers) {
@@ -90,16 +214,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
public void testGetCloseProducer() throws Exception {
String srcTopic = "test-src-topic";
int ptnIdx = 1234;
- Producer producer = producers.getProducer(srcTopic, ptnIdx);
+ Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
String producerName = makeProducerName(srcTopic, ptnIdx);
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .createProducer(
- eq(TEST_OUTPUT_TOPIC),
- any(ProducerConfiguration.class)
- );
+ .newProducer();
assertTrue(producers.getProducers().containsKey(srcTopic));
assertEquals(1, producers.getProducers().get(srcTopic).size());
assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
@@ -107,10 +228,7 @@ public class MultiConsumersOneOutputTopicProducersTest {
// second get will not create a new producer
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .createProducer(
- eq(TEST_OUTPUT_TOPIC),
- any(ProducerConfiguration.class)
- );
+ .newProducer();
assertTrue(producers.getProducers().containsKey(srcTopic));
assertEquals(1, producers.getProducers().get(srcTopic).size());
assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 13d8e62..df89eed 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,8 +20,9 @@
package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
+
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientConfiguration;
+
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -46,7 +47,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
throws Exception {
this(
threadGroupName,
- pulsarServiceUrl != null ? PulsarClient.create(pulsarServiceUrl,
new ClientConfiguration()) : null,
+ pulsarServiceUrl != null ?
PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null,
storageServiceUrl);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 0a4cdeb..1c045d4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -29,13 +29,13 @@ import java.util.function.Function;
@Slf4j
public class FunctionAssignmentTailer
- implements java.util.function.Consumer<Message>, Function<Throwable,
Void>, AutoCloseable {
+ implements java.util.function.Consumer<Message<byte[]>>,
Function<Throwable, Void>, AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- private final Reader reader;
+ private final Reader<byte[]> reader;
public FunctionAssignmentTailer(FunctionRuntimeManager
functionRuntimeManager,
- Reader reader)
+ Reader<byte[]> reader)
throws PulsarClientException {
this.functionRuntimeManager = functionRuntimeManager;
this.reader = reader;
@@ -64,7 +64,7 @@ public class FunctionAssignmentTailer
}
@Override
- public void accept(Message msg) {
+ public void accept(Message<byte[]> msg) {
// check if latest
boolean hasMessageAvailable;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index b5f9751..29a3220 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -88,7 +88,7 @@ public class FunctionMetaDataManager implements AutoCloseable
{
log.info("/** Initializing Function Metadata Manager **/");
try {
- Reader reader = pulsarClient.newReader()
+ Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
.create();
@@ -410,6 +410,6 @@ public class FunctionMetaDataManager implements
AutoCloseable {
}
private ServiceRequestManager getServiceRequestManager(PulsarClient
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
- return new
ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
+ return new
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index a373158..4bcaaf7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,13 +29,13 @@ import
org.apache.pulsar.functions.proto.Request.ServiceRequest;
@Slf4j
public class FunctionMetaDataTopicTailer
- implements java.util.function.Consumer<Message>, Function<Throwable,
Void>, AutoCloseable {
+ implements java.util.function.Consumer<Message<byte[]>>,
Function<Throwable, Void>, AutoCloseable {
private final FunctionMetaDataManager functionMetaDataManager;
- private final Reader reader;
+ private final Reader<byte[]> reader;
public FunctionMetaDataTopicTailer(FunctionMetaDataManager
functionMetaDataManager,
- Reader reader)
+ Reader<byte[]> reader)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
this.reader = reader;
@@ -62,7 +62,7 @@ public class FunctionMetaDataTopicTailer
log.info("Stopped function state consumer");
}
- public void processRequest(Message msg) {
+ public void processRequest(Message<byte[]> msg) {
ServiceRequest serviceRequest;
try {
@@ -80,7 +80,7 @@ public class FunctionMetaDataTopicTailer
}
@Override
- public void accept(Message msg) {
+ public void accept(Message<byte[]> msg) {
processRequest(msg);
// receive next request
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7e793a..af23992 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -85,7 +85,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
MembershipManager membershipManager) throws
Exception {
this.workerConfig = workerConfig;
- Reader reader = pulsarClient.newReader()
+ Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionAssignmentTopic())
.startMessageId(MessageId.earliest)
.create();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index 21dd69f..2a3dc0f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -22,12 +22,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import lombok.extern.slf4j.Slf4j;
/**
* A starter to start function worker.
*/
-@Slf4j
public class FunctionWorkerStarter {
private static class WorkerArguments {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 4e5fb21..f43311d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
+
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,16 +32,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,7 @@ import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
public class MembershipManager implements AutoCloseable, ConsumerEventListener
{
private final String consumerName;
- private final Consumer consumer;
+ private final Consumer<byte[]> consumer;
private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdminClient;
private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -85,18 +86,17 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
// we don't produce any messages into this topic, we only use the
`failover` subscription
// to elect an active consumer as the leader worker. The leader worker
will be responsible
// for scheduling snapshots for FMT and doing task assignment.
- consumer = client.subscribe(
- workerConfig.getClusterCoordinationTopic(),
- COORDINATION_TOPIC_SUBSCRIPTION,
- new ConsumerConfiguration()
- .setSubscriptionType(SubscriptionType.Failover)
- .setConsumerEventListener(this)
- .setProperty(WORKER_IDENTIFIER, consumerName)
- );
+ consumer = client.newConsumer()
+ .topic(workerConfig.getClusterCoordinationTopic())
+ .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+ .subscriptionType(SubscriptionType.Failover)
+ .consumerEventListener(this)
+ .property(WORKER_IDENTIFIER, consumerName)
+ .subscribe();
}
@Override
- public void becameActive(Consumer consumer, int partitionId) {
+ public void becameActive(Consumer<?> consumer, int partitionId) {
firstConsumerEventFuture.complete(null);
if (isLeader.compareAndSet(false, true)) {
log.info("Worker {} became the leader.", consumerName);
@@ -104,7 +104,7 @@ public class MembershipManager implements AutoCloseable,
ConsumerEventListener {
}
@Override
- public void becameInactive(Consumer consumer, int partitionId) {
+ public void becameInactive(Consumer<?> consumer, int partitionId) {
firstConsumerEventFuture.complete(null);
if (isLeader.compareAndSet(true, false)) {
log.info("Worker {} lost the leadership.", consumerName);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c6b72a5..ed00958 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,21 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -47,6 +32,21 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.scheduler.IScheduler;
+
@Slf4j
public class SchedulerManager implements AutoCloseable {
@@ -63,7 +63,7 @@ public class SchedulerManager implements AutoCloseable {
private final IScheduler scheduler;
- private final Producer producer;
+ private final Producer<byte[]> producer;
private final ExecutorService executorService;
@@ -72,14 +72,10 @@ public class SchedulerManager implements AutoCloseable {
this.scheduler =
Reflections.createInstance(workerConfig.getSchedulerClassName(),
IScheduler.class,
Thread.currentThread().getContextClassLoader());
- ProducerConfiguration producerConf = new ProducerConfiguration()
- .setBatchingEnabled(true)
- .setBlockIfQueueFull(true)
- .setCompressionType(CompressionType.LZ4)
- // retry until succeed
- .setSendTimeout(0, TimeUnit.MILLISECONDS);
try {
- this.producer =
pulsarClient.createProducer(this.workerConfig.getFunctionAssignmentTopic(),
producerConf);
+ this.producer =
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
+
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+ sendTimeout(0, TimeUnit.MILLISECONDS).create();
} catch (PulsarClientException e) {
log.error("Failed to create producer to function assignment topic "
+ this.workerConfig.getFunctionAssignmentTopic(), e);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 5a9f339..071e946 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,8 +18,18 @@
*/
package org.apache.pulsar.functions.worker;
-import dlshade.org.apache.zookeeper.KeeperException.Code;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,17 +43,7 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.UUID;
+import dlshade.org.apache.zookeeper.KeeperException.Code;
@Slf4j
public final class Utils {
@@ -129,13 +129,14 @@ public final class Utils {
OutputStream outputStream,
String packagePath) throws
IOException {
DistributedLogManager dlm = namespace.openLog(packagePath);
- InputStream in = new DLInputStream(dlm);
- int read = 0;
- byte[] bytes = new byte[1024];
- while ((read = in.read(bytes)) != -1) {
- outputStream.write(bytes, 0, read);
+ try (InputStream in = new DLInputStream(dlm)) {
+ int read = 0;
+ byte[] bytes = new byte[1024];
+ while ((read = in.read(bytes)) != -1) {
+ outputStream.write(bytes, 0, read);
+ }
+ outputStream.flush();
}
- outputStream.flush();
}
public static DistributedLogConfiguration getDlogConf(WorkerConfig
workerConfig) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index fb875b5..06f4e53 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,12 +20,16 @@ package org.apache.pulsar.functions.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import lombok.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
import lombok.experimental.Accessors;
@Data
@@ -92,7 +96,7 @@ public class WorkerConfig implements Serializable {
public String getFunctionAssignmentTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace,
functionAssignmentTopicName);
}
-
+
public static WorkerConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), WorkerConfig.class);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5df3c3f..e33dcba 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,7 +76,7 @@ public class WorkerService {
// initialize the function metadata manager
try {
- this.client =
PulsarClient.create(this.workerConfig.getPulsarServiceUrl());
+ this.client =
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build();
log.info("Created Pulsar client");
//create scheduler manager
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 3d27fb4..ac99f9a 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -19,17 +19,21 @@
package org.apache.pulsar.functions.worker;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
@@ -42,12 +46,24 @@ import org.testng.annotations.Test;
@Slf4j
public class FunctionMetaDataManagerTest {
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+
+ when(builder.create()).thenReturn(mock(Producer.class));
+
+ PulsarClient client = mock(PulsarClient.class);
+ when(client.newProducer()).thenReturn(builder);
+
+ return client;
+ }
+
@Test
public void testListFunctions() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(new WorkerConfig(),
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new
HashMap<>();
functionMetaDataMap1.put("func-1",
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -87,7 +103,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
@@ -123,7 +139,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Map<String, Function.FunctionMetaData> functionMetaDataMap = new
HashMap<>();
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -167,7 +183,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -209,7 +225,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -247,7 +263,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
// worker has no record of function
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -275,7 +291,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -324,7 +340,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -361,7 +377,7 @@ public class FunctionMetaDataManagerTest {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
// worker has no record of function
Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -388,7 +404,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -415,7 +431,7 @@ public class FunctionMetaDataManagerTest {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14b56ac..6dd3fa3 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,24 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
@@ -49,6 +31,28 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.proto.Function;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
public class MembershipManagerTest {
private final WorkerConfig workerConfig;
@@ -63,29 +67,33 @@ public class MembershipManagerTest {
@Test
public void testConsumerEventListener() throws Exception {
- PulsarClient mockClient = mock(PulsarClient.class);
- Consumer mockConsumer = mock(Consumer.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+ Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
+
+
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
AtomicReference<ConsumerEventListener> listenerHolder = new
AtomicReference<>();
- when(mockClient.subscribe(
- eq(workerConfig.getClusterCoordinationTopic()),
- eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
- any(ConsumerConfiguration.class)
- )).thenAnswer(invocationOnMock -> {
+
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
-> {
- ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2,
ConsumerConfiguration.class);
- listenerHolder.set(conf.getConsumerEventListener());
+ ConsumerEventListener listener = invocationOnMock.getArgumentAt(0,
ConsumerEventListener.class);
+ listenerHolder.set(listener);
- return mockConsumer;
+ return mockConsumerBuilder;
});
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockClient));
assertFalse(membershipManager.isLeader());
verify(mockClient, times(1))
- .subscribe(
- eq(workerConfig.getClusterCoordinationTopic()),
- eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
- any(ConsumerConfiguration.class));
+ .newConsumer();
listenerHolder.get().becameActive(mockConsumer, 0);
assertTrue(membershipManager.isLeader());
@@ -94,11 +102,31 @@ public class MembershipManagerTest {
assertFalse(membershipManager.isLeader());
}
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+ Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
+
+
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+ return mockClient;
+ }
+
@Test
public void testCheckFailuresNoFailures() throws Exception {
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -110,7 +138,7 @@ public class MembershipManagerTest {
mock(MembershipManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
@@ -162,7 +190,7 @@ public class MembershipManagerTest {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -175,7 +203,7 @@ public class MembershipManagerTest {
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
@@ -251,7 +279,7 @@ public class MembershipManagerTest {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -263,7 +291,7 @@ public class MembershipManagerTest {
mock(MembershipManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 64d6931..b5b4e98 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.functions.worker;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +46,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -61,6 +66,18 @@ public class SchedulerManagerTest {
private CompletableFuture<MessageId> completableFuture;
private Producer producer;
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+
+ when(builder.create()).thenReturn(mock(Producer.class));
+
+ PulsarClient client = mock(PulsarClient.class);
+ when(client.newProducer()).thenReturn(builder);
+
+ return client;
+ }
+
@BeforeMethod
public void setup() throws PulsarClientException {
WorkerConfig workerConfig = new WorkerConfig();
@@ -78,8 +95,17 @@ public class SchedulerManagerTest {
byte[] bytes = any();
when(producer.sendAsync(bytes)).thenReturn(completableFuture);
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.sendTimeout(anyInt(),
any(TimeUnit.class))).thenReturn(builder);
+
+ when(builder.create()).thenReturn(producer);
+
PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
+ when(pulsarClient.newProducer()).thenReturn(builder);
schedulerManager = spy(new SchedulerManager(workerConfig,
pulsarClient));
functionRuntimeManager = mock(FunctionRuntimeManager.class);
@@ -380,9 +406,6 @@ public class SchedulerManagerTest {
// scale up
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Scaled =
Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version)
@@ -500,9 +523,6 @@ public class SchedulerManagerTest {
// scale down
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Scaled =
Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
@@ -612,9 +632,6 @@ public class SchedulerManagerTest {
// scale down
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Updated =
Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 406a2c8..cb21f6f 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -23,6 +23,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import com.google.common.base.Splitter;
+
import java.io.IOException;
import java.util.Base64;
import java.util.List;
@@ -30,15 +31,19 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -63,10 +68,10 @@ import org.slf4j.LoggerFactory;
public class ConsumerHandler extends AbstractWebSocketHandler {
private String subscription = null;
- private final ConsumerConfiguration conf;
+ private SubscriptionType subscriptionType;
private Consumer<byte[]> consumer;
- private final int maxPendingMessages;
+ private int maxPendingMessages;
private final AtomicInteger pendingMessages = new AtomicInteger();
private final LongAdder numMsgsDelivered;
@@ -78,20 +83,26 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
public ConsumerHandler(WebSocketService service, HttpServletRequest
request, ServletUpgradeResponse response) {
super(service, request, response);
- this.conf = getConsumerConfiguration();
- this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 :
conf.getReceiverQueueSize();
+
+ ConsumerBuilderImpl<byte[]> builder;
+
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();
try {
+ builder = (ConsumerBuilderImpl<byte[]>)
getConsumerConfiguration(service.getPulsarClient());
+ this.maxPendingMessages =
(builder.getConf().getReceiverQueueSize() == 0) ? 1
+ : builder.getConf().getReceiverQueueSize();
+ this.subscriptionType = builder.getConf().getSubscriptionType();
+
// checkAuth() should be called after assigning a value to
this.subscription
this.subscription = extractSubscription(request);
if (!checkAuth(response)) {
return;
}
- this.consumer =
service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
+ this.consumer =
builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic
{}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -242,7 +253,7 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
}
}
- public Consumer getConsumer() {
+ public Consumer<byte[]> getConsumer() {
return this.consumer;
}
@@ -251,7 +262,7 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
}
public SubscriptionType getSubscriptionType() {
- return conf.getSubscriptionType();
+ return subscriptionType;
}
public long getAndResetNumMsgsDelivered() {
@@ -267,7 +278,7 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
}
public long getMsgDeliveredCounter() {
- return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+ return msgDeliveredCounter;
}
protected void updateDeliverMsgStat(long msgSize) {
@@ -276,32 +287,32 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
numBytesDelivered.add(msgSize);
}
- private ConsumerConfiguration getConsumerConfiguration() {
- ConsumerConfiguration conf = new ConsumerConfiguration();
+ private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient
client) {
+ ConsumerBuilder<byte[]> builder = client.newConsumer();
if (queryParams.containsKey("ackTimeoutMillis")) {
-
conf.setAckTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")),
TimeUnit.MILLISECONDS);
+
builder.ackTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("subscriptionType")) {
checkArgument(Enums.getIfPresent(SubscriptionType.class,
queryParams.get("subscriptionType")).isPresent(),
"Invalid subscriptionType %s",
queryParams.get("subscriptionType"));
-
conf.setSubscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
+
builder.subscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
}
if (queryParams.containsKey("receiverQueueSize")) {
-
conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
1000));
+
builder.receiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
1000));
}
if (queryParams.containsKey("consumerName")) {
- conf.setConsumerName(queryParams.get("consumerName"));
+ builder.consumerName(queryParams.get("consumerName"));
}
if (queryParams.containsKey("priorityLevel")) {
-
conf.setPriorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
+
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}
- return conf;
+ return builder;
}
@Override
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ed8adf..80545bf 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -24,6 +24,9 @@ import static
org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFrom
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Enums;
+
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
@@ -35,12 +38,13 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
@@ -52,9 +56,6 @@ import
org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Enums;
-
/**
* Websocket end-point url handler to handle incoming message coming from
client. Websocket end-point url handler to
@@ -91,8 +92,7 @@ public class ProducerHandler extends AbstractWebSocketHandler
{
}
try {
- ProducerConfiguration conf = getProducerConfiguration();
- this.producer =
service.getPulsarClient().createProducer(topic.toString(), conf);
+ this.producer =
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic
{}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -166,7 +166,7 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
}
final long msgSize = rawPayload.length;
- MessageBuilder builder =
MessageBuilder.create().setContent(rawPayload);
+ MessageBuilder<byte[]> builder =
MessageBuilder.create().setContent(rawPayload);
if (sendRequest.properties != null) {
builder.setProperties(sendRequest.properties);
@@ -196,7 +196,7 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
});
}
- public Producer getProducer() {
+ public Producer<byte[]> getProducer() {
return this.producer;
}
@@ -222,7 +222,7 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
}
public long getMsgPublishedCounter() {
- return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
+ return msgPublishedCounter;
}
@Override
@@ -248,45 +248,44 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
}
- private ProducerConfiguration getProducerConfiguration() {
- ProducerConfiguration conf = new ProducerConfiguration();
-
- conf.setBatchingEnabled(false);
- conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+ private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+ ProducerBuilder<byte[]> builder = client.newProducer()
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition);
// Set to false to prevent the server thread from being blocked if a
lot of messages are pending.
- conf.setBlockIfQueueFull(false);
+ builder.blockIfQueueFull(false);
if (queryParams.containsKey("producerName")) {
- conf.setProducerName(queryParams.get("producerName"));
+ builder.producerName(queryParams.get("producerName"));
}
if (queryParams.containsKey("initialSequenceId")) {
- conf.setInitialSequenceId(Long.parseLong("initialSequenceId"));
+ builder.initialSequenceId(Long.parseLong("initialSequenceId"));
}
if (queryParams.containsKey("hashingScheme")) {
-
conf.setHashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
+
builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
}
if (queryParams.containsKey("sendTimeoutMillis")) {
-
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
+
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("batchingEnabled")) {
-
conf.setBatchingEnabled(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
}
if (queryParams.containsKey("batchingMaxMessages")) {
-
conf.setBatchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
}
if (queryParams.containsKey("maxPendingMessages")) {
-
conf.setMaxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
}
if (queryParams.containsKey("batchingMaxPublishDelay")) {
-
conf.setBatchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
TimeUnit.MILLISECONDS);
}
@@ -296,17 +295,17 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
"Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
- conf.setMessageRoutingMode(routingMode);
+ builder.messageRoutingMode(routingMode);
}
}
if (queryParams.containsKey("compressionType")) {
checkArgument(Enums.getIfPresent(CompressionType.class,
queryParams.get("compressionType")).isPresent(),
"Invalid compressionType %s",
queryParams.get("compressionType"));
-
conf.setCompressionType(CompressionType.valueOf(queryParams.get("compressionType")));
+
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
}
- return conf;
+ return builder;
}
private static final Logger log =
LoggerFactory.getLogger(ProducerHandler.class);
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54037fa..82d7cff 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -94,7 +94,7 @@ public class ReaderHandler extends AbstractWebSocketHandler {
this.reader = builder.create();
- this.subscription =
((ReaderImpl)this.reader).getConsumer().getSubscription();
+ this.subscription = ((ReaderImpl<?>)
this.reader).getConsumer().getSubscription();
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}",
request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -214,8 +214,8 @@ public class ReaderHandler extends AbstractWebSocketHandler
{
}
}
- public Consumer getConsumer() {
- return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
+ public Consumer<?> getConsumer() {
+ return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
}
public String getSubscription() {
@@ -235,7 +235,7 @@ public class ReaderHandler extends AbstractWebSocketHandler
{
}
public long getMsgDeliveredCounter() {
- return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+ return msgDeliveredCounter;
}
protected void updateDeliverMsgStat(long msgSize) {
--
To stop receiving notification emails like this one, please contact
[email protected].