sijie closed pull request #1580: Modified functions producer consumers to use
new builder based api
URL: https://github.com/apache/incubator-pulsar/pull/1580
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 073614623..c16d6767c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -21,6 +21,10 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +35,7 @@
import org.apache.pulsar.broker.BookKeeperClientFactoryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -40,12 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import lombok.Cleanup;
-
public class CompactorTool {
private static class Arguments {
@@ -82,15 +80,16 @@ public static void main(String[] args) throws Exception {
}
String pulsarServiceUrl = PulsarService.brokerUrl(brokerConfig);
- ClientConfiguration clientConfig = new ClientConfiguration();
+ ClientBuilder clientBuilder = PulsarClient.builder();
if (isNotBlank(brokerConfig.getBrokerClientAuthenticationPlugin())) {
-
clientConfig.setAuthentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
-
brokerConfig.getBrokerClientAuthenticationParameters());
+
clientBuilder.authentication(brokerConfig.getBrokerClientAuthenticationPlugin(),
+ brokerConfig.getBrokerClientAuthenticationParameters());
}
- clientConfig.setUseTls(brokerConfig.isTlsEnabled());
-
clientConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection());
-
clientConfig.setTlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
+ clientBuilder.serviceUrl(pulsarServiceUrl)
+ .enableTls(brokerConfig.isTlsEnabled())
+
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
+
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
@@ -103,7 +102,7 @@ public static void main(String[] args) throws Exception {
(int)brokerConfig.getZooKeeperSessionTimeoutMillis()).get();
BookKeeperClientFactory bkClientFactory = new
BookKeeperClientFactoryImpl();
BookKeeper bk = bkClientFactory.create(brokerConfig, zk);
- try (PulsarClient pulsar = PulsarClient.create(pulsarServiceUrl,
clientConfig)) {
+ try (PulsarClient pulsar = clientBuilder.build()) {
Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar,
bk, scheduler);
long ledgerId = compactor.compact(arguments.topic).get();
log.info("Compaction of topic {} complete. Compacted to ledger
{}", arguments.topic, ledgerId);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 3213fd961..5db4f2e57 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,36 +21,33 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.ImmutableMap;
+
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.RawBatchConverter;
-
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 32cb5c098..e3041ba05 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -93,10 +93,11 @@ public void updatePeerClusterNames(String cluster,
LinkedHashSet<String> peerClu
} catch (Exception e) {
throw getApiException(e);
}
-
+
}
@Override
+ @SuppressWarnings("unchecked")
public Set<String> getPeerClusterNames(String cluster) throws
PulsarAdminException {
try {
return
request(adminClusters.path(cluster).path("peers")).get(LinkedHashSet.class);
@@ -104,7 +105,7 @@ public void updatePeerClusterNames(String cluster,
LinkedHashSet<String> peerClu
throw getApiException(e);
}
}
-
+
@Override
public void deleteCluster(String cluster) throws PulsarAdminException {
try {
@@ -125,7 +126,7 @@ public void deleteCluster(String cluster) throws
PulsarAdminException {
}
}
-
+
@Override
public List<BrokerNamespaceIsolationData>
getBrokersWithNamespaceIsolationPolicy(String cluster)
throws PulsarAdminException {
@@ -223,7 +224,7 @@ public FailureDomain getFailureDomain(String cluster,
String domainName) throws
throw getApiException(e);
}
}
-
+
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
try {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index 3121739fb..86f2c05b9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -796,7 +797,7 @@ private TopicName validateTopic(String topic) {
}
}
- return Lists.newArrayList(new MessageImpl<byte[]>(msgId,
properties, data, Schema.IDENTITY));
+ return Collections.singletonList(new MessageImpl<byte[]>(msgId,
properties, data, Schema.IDENTITY));
} finally {
if (stream != null) {
stream.close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f024a4f7a..af11ef594 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -236,4 +236,8 @@ private ConsumerBuilderImpl(PulsarClientImpl client,
ConsumerConfigurationData<T
conf.setSubscriptionInitialPosition(subscriptionInitialPosition);
return this;
}
+
+ public ConsumerConfigurationData<T> getConf() {
+ return conf;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 819c118cc..86aeb83ef 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -38,7 +38,7 @@
public class AtLeastOnceProcessor extends MessageProcessorBase {
@Getter
- private Producer producer;
+ private Producer<byte[]> producer;
AtLeastOnceProcessor(PulsarClient client,
FunctionDetails functionDetails,
@@ -52,13 +52,13 @@ protected void initializeOutputProducer(String outputTopic)
throws Exception {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg, MessageBuilder
outputMsgBuilder) {
+ public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
if (null == outputMsgBuilder || null == producer) {
inputMsg.ack();
return;
}
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
.thenAccept(msgId -> inputMsg.ack());
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index b6871fa8d..994be0d03 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -36,7 +36,7 @@
@Slf4j
class AtMostOnceProcessor extends MessageProcessorBase {
- private Producer producer;
+ private Producer<byte[]> producer;
AtMostOnceProcessor(PulsarClient client,
FunctionDetails functionDetails,
@@ -58,12 +58,12 @@ protected void initializeOutputProducer(String outputTopic)
throws Exception {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg, MessageBuilder
outputMsgBuilder) {
+ public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
if (null == outputMsgBuilder) {
return;
}
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg);
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index d31bb0619..59c7bd6f3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -22,11 +22,13 @@
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
@@ -102,7 +104,7 @@ protected void initializeOutputProducer(String outputTopic)
throws Exception {
@Override
public void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder outputMsgBuilder) throws
Exception {
+ MessageBuilder<byte[]> outputMsgBuilder)
throws Exception {
if (null == outputMsgBuilder) {
inputMsg.ackCumulative();
return;
@@ -113,15 +115,14 @@ public void sendOutputMessage(InputMessage inputMsg,
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
- Producer producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
+ Producer<byte[]> producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
- Message outputMsg = outputMsgBuilder.build();
+ Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
.thenAccept(messageId -> inputMsg.ackCumulative())
.join();
}
-
@Override
public void close() {
super.close();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 33d4f77d0..97c971d0a 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -82,7 +82,7 @@ void setupInput(Map<String, SerDe> inputSerDe)
*
* @return the input consumer.
*/
- Consumer getInputConsumer();
+ Consumer<byte[]> getInputConsumer();
/**
* Setup the output with a provided <i>outputSerDe</i>. The implementation
of this processor is responsible for
@@ -103,7 +103,7 @@ void setupInput(Map<String, SerDe> inputSerDe)
* @param outputMsgBuilder output message builder. it can be null.
*/
void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder outputMsgBuilder) throws
PulsarClientException, Exception;
+ MessageBuilder<byte[]> outputMsgBuilder) throws
PulsarClientException, Exception;
/**
* Get the next message to process
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index d525a1aac..fc37b134e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -24,7 +24,9 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -51,7 +53,7 @@
protected SerDe outputSerDe;
@Getter
- protected Consumer inputConsumer;
+ protected Consumer<byte[]> inputConsumer;
protected List<String> topics;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index bba9a7f86..7a561a17d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -19,11 +19,12 @@
package org.apache.pulsar.functions.instance.producers;
import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
@@ -32,51 +33,43 @@
protected final PulsarClient client;
protected final String outputTopic;
- protected final ProducerConfiguration conf;
AbstractOneOuputTopicProducers(PulsarClient client,
String outputTopic)
throws PulsarClientException {
this.client = client;
this.outputTopic = outputTopic;
- this.conf = newProducerConfiguration();
}
- static ProducerConfiguration newProducerConfiguration() {
- ProducerConfiguration conf = new ProducerConfiguration();
- conf.setBlockIfQueueFull(true);
- conf.setBatchingEnabled(true);
- conf.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
- conf.setMaxPendingMessages(1000000);
- conf.setCompressionType(CompressionType.LZ4);
- conf.setHashingScheme(HashingScheme.Murmur3_32Hash);
- conf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
+ static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
// use function result router to deal with different processing
guarantees.
- conf.setMessageRouter(FunctionResultRouter.of());
- return conf;
+ return client.newProducer() //
+ .blockIfQueueFull(true) //
+ .enableBatching(true) //
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
+ .compressionType(CompressionType.LZ4) //
+ .hashingScheme(HashingScheme.Murmur3_32Hash) //
+ .messageRoutingMode(MessageRoutingMode.CustomPartition) //
+ .messageRouter(FunctionResultRouter.of());
}
- protected Producer createProducer(String topic)
+ protected Producer<byte[]> createProducer(String topic)
throws PulsarClientException {
return createProducer(client, topic);
}
- public static Producer createProducer(PulsarClient client, String topic)
+ public static Producer<byte[]> createProducer(PulsarClient client, String
topic)
throws PulsarClientException {
- return client.createProducer(topic, newProducerConfiguration());
+ return newProducerBuilder(client).topic(topic).create();
}
- protected Producer createProducer(String topic, String producerName)
+ protected Producer<byte[]> createProducer(String topic, String
producerName)
throws PulsarClientException {
return createProducer(client, topic, producerName);
}
- public static Producer createProducer(PulsarClient client, String topic,
String producerName)
- throws PulsarClientException {
- ProducerConfiguration newConf = newProducerConfiguration();
- newConf.setProducerName(producerName);
-
- return client.createProducer(topic, newConf);
+ public static Producer<byte[]> createProducer(PulsarClient client, String
topic, String producerName)
+ throws PulsarClientException {
+ return
newProducerBuilder(client).topic(topic).producerName(producerName).create();
}
-
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index cf21a042b..3668311ed 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -37,7 +37,7 @@
public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicProducers {
@Getter(AccessLevel.PACKAGE)
- private final Map<String, IntObjectMap<Producer>> producers;
+ private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
public MultiConsumersOneOuputTopicProducers(PulsarClient client,
String outputTopic)
@@ -56,14 +56,14 @@ static String makeProducerName(String srcTopicName, int
srcTopicPartition) {
}
@Override
- public synchronized Producer getProducer(String srcTopicName, int
srcTopicPartition) throws PulsarClientException {
- IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+ public synchronized Producer<byte[]> getProducer(String srcTopicName, int
srcTopicPartition) throws PulsarClientException {
+ IntObjectMap<Producer<byte[]>> producerMap =
producers.get(srcTopicName);
if (null == producerMap) {
producerMap = new IntObjectHashMap<>();
producers.put(srcTopicName, producerMap);
}
- Producer producer = producerMap.get(srcTopicPartition);
+ Producer<byte[]> producer = producerMap.get(srcTopicPartition);
if (null == producer) {
producer = createProducer(outputTopic,
makeProducerName(srcTopicName, srcTopicPartition));
producerMap.put(srcTopicPartition, producer);
@@ -73,10 +73,10 @@ public synchronized Producer getProducer(String
srcTopicName, int srcTopicPartit
@Override
public synchronized void closeProducer(String srcTopicName, int
srcTopicPartition) {
- IntObjectMap<Producer> producerMap = producers.get(srcTopicName);
+ IntObjectMap<Producer<byte[]>> producerMap =
producers.get(srcTopicName);
if (null != producerMap) {
- Producer producer = producerMap.remove(srcTopicPartition);
+ Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
if (null != producer) {
producer.closeAsync();
}
@@ -89,8 +89,8 @@ public synchronized void closeProducer(String srcTopicName,
int srcTopicPartitio
@Override
public synchronized void close() {
List<CompletableFuture<Void>> closeFutures = new
ArrayList<>(producers.size());
- for (IntObjectMap<Producer> producerMap: producers.values()) {
- for (Producer producer : producerMap.values()) {
+ for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+ for (Producer<byte[]> producer : producerMap.values()) {
closeFutures.add(producer.closeAsync());
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 9669bea2b..29cd96a1e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,8 +42,7 @@
* src topic partition
* @return the producer instance to produce messages
*/
- Producer getProducer(String srcTopicName,
- int srcTopicPartition) throws PulsarClientException;
+ Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition)
throws PulsarClientException;
/**
* Close a producer specified by <tt>srcTopicName</tt> and
<tt>srcTopicPartition</tt>
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index f98ac4497..a1ad2898d 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -45,7 +45,7 @@ public void setup() {
@Test
public void testChoosePartitionWithoutKeyWithoutSequenceId() {
- Message msg = mock(Message.class);
+ Message<?> msg = mock(Message.class);
when(msg.hasKey()).thenReturn(false);
when(msg.getKey()).thenReturn(null);
when(msg.getSequenceId()).thenReturn(-1L);
@@ -70,7 +70,7 @@ public void testChoosePartitionWithoutKeySequenceId() {
FunctionResultRouter router = new FunctionResultRouter(0, clock);
for (int i = 0; i < 10; i++) {
- Message msg = mock(Message.class);
+ Message<?> msg = mock(Message.class);
when(msg.hasKey()).thenReturn(false);
when(msg.getKey()).thenReturn(null);
when(msg.getSequenceId()).thenReturn((long) (2 * i));
@@ -82,11 +82,11 @@ public void testChoosePartitionWithoutKeySequenceId() {
public void testChoosePartitionWithKeyWithoutSequenceId() {
String key1 = "key1";
String key2 = "key2";
- Message msg1 = mock(Message.class);
+ Message<?> msg1 = mock(Message.class);
when(msg1.hasKey()).thenReturn(true);
when(msg1.getKey()).thenReturn(key1);
when(msg1.getSequenceId()).thenReturn(-1L);
- Message msg2 = mock(Message.class);
+ Message<?> msg2 = mock(Message.class);
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
when(msg1.getSequenceId()).thenReturn(-1L);
@@ -105,12 +105,12 @@ public void testChoosePartitionWithKeyWithoutSequenceId()
{
public void testChoosePartitionWithKeySequenceId() {
String key1 = "key1";
String key2 = "key2";
- Message msg1 = mock(Message.class);
+ Message<?> msg1 = mock(Message.class);
when(msg1.hasKey()).thenReturn(true);
when(msg1.getKey()).thenReturn(key1);
// make sure sequence id is different from hashcode, so the test can
be tested correctly.
when(msg1.getSequenceId()).thenReturn((long) ((key1.hashCode() % 100)
+ 1));
- Message msg2 = mock(Message.class);
+ Message<?> msg2 = mock(Message.class);
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100)
+ 1));
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index 5b463b642..a11062fb5 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -1009,4 +1009,4 @@
// assertTrue(mockProducers.isEmpty());
// }
// }
-//}
\ No newline at end of file
+//}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 77de78b4f..b9e762a58 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -55,5 +55,6 @@ public void testLambda() {
JavaExecutionResult result =
instance.handleMessage(MessageId.earliest, "random", testString);
assertNotNull(result.getResult());
assertEquals(new String(testString + "-lambda"), result.getResult());
+ instance.close();
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 577015a2c..e6072c8d3 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,9 +19,6 @@
package org.apache.pulsar.functions.instance.producers;
import static
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -33,10 +30,19 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -48,34 +54,152 @@
private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
private PulsarClient mockClient;
- private final Map<String, Producer> mockProducers = new HashMap<>();
+ private final Map<String, Producer<byte[]>> mockProducers = new
HashMap<>();
private MultiConsumersOneOuputTopicProducers producers;
+ private class MockProducerBuilder implements ProducerBuilder<byte[]> {
+
+ String producerName = "";
+
+ @Override
+ public Producer<byte[]> create() throws PulsarClientException {
+ Producer<byte[]> producer;
+ synchronized (mockProducers) {
+ producer = mockProducers.get(producerName);
+ if (null == producer) {
+ producer = createMockProducer(producerName);
+ mockProducers.put(producerName, producer);
+ }
+ }
+ return producer;
+ }
+
+ @Override
+ public CompletableFuture<Producer<byte[]>> createAsync() {
+ try {
+ return CompletableFuture.completedFuture(create());
+ } catch (PulsarClientException e) {
+ CompletableFuture<Producer<byte[]>> future = new
CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> clone() {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> topic(String topicName) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> producerName(String producerName) {
+ this.producerName = producerName;
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> sendTimeout(int sendTimeout, TimeUnit
unit) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> maxPendingMessages(int
maxPendingMessages) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> maxPendingMessagesAcrossPartitions(int
maxPendingMessagesAcrossPartitions) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> blockIfQueueFull(boolean
blockIfQueueFull) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> messageRoutingMode(MessageRoutingMode
messageRoutingMode) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> hashingScheme(HashingScheme
hashingScheme) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> compressionType(CompressionType
compressionType) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> messageRouter(MessageRouter
messageRouter) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> enableBatching(boolean enableBatching) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> cryptoKeyReader(CryptoKeyReader
cryptoKeyReader) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> addEncryptionKey(String key) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]>
cryptoFailureAction(ProducerCryptoFailureAction action) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> batchingMaxPublishDelay(long
batchDelay, TimeUnit timeUnit) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> batchingMaxMessages(int
batchMessagesMaxMessagesPerBatch) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> initialSequenceId(long
initialSequenceId) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> property(String key, String value) {
+ return this;
+ }
+
+ @Override
+ public ProducerBuilder<byte[]> properties(Map<String, String>
properties) {
+ return this;
+ }
+ }
+
@BeforeMethod
public void setup() throws Exception {
this.mockClient = mock(PulsarClient.class);
- when(mockClient.createProducer(anyString(),
any(ProducerConfiguration.class)))
- .thenAnswer(invocationOnMock -> {
- ProducerConfiguration conf = invocationOnMock.getArgumentAt(1,
ProducerConfiguration.class);
- String producerName = conf.getProducerName();
-
- synchronized (mockProducers) {
- Producer producer = mockProducers.get(producerName);
- if (null == producer) {
- producer = createMockProducer(producerName);
- mockProducers.put(producerName, producer);
- }
- return producer;
- }
- });
+ when(mockClient.newProducer())
+ .thenReturn(new MockProducerBuilder());
producers = new MultiConsumersOneOuputTopicProducers(mockClient,
TEST_OUTPUT_TOPIC);
producers.initialize();
}
- private Producer createMockProducer(String topic) {
- Producer producer = mock(Producer.class);
+ private Producer<byte[]> createMockProducer(String topic) {
+ Producer<byte[]> producer = mock(Producer.class);
when(producer.closeAsync())
.thenAnswer(invocationOnMock -> {
synchronized (mockProducers) {
@@ -90,16 +214,13 @@ private Producer createMockProducer(String topic) {
public void testGetCloseProducer() throws Exception {
String srcTopic = "test-src-topic";
int ptnIdx = 1234;
- Producer producer = producers.getProducer(srcTopic, ptnIdx);
+ Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
String producerName = makeProducerName(srcTopic, ptnIdx);
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .createProducer(
- eq(TEST_OUTPUT_TOPIC),
- any(ProducerConfiguration.class)
- );
+ .newProducer();
assertTrue(producers.getProducers().containsKey(srcTopic));
assertEquals(1, producers.getProducers().get(srcTopic).size());
assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
@@ -107,10 +228,7 @@ public void testGetCloseProducer() throws Exception {
// second get will not create a new producer
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .createProducer(
- eq(TEST_OUTPUT_TOPIC),
- any(ProducerConfiguration.class)
- );
+ .newProducer();
assertTrue(producers.getProducers().containsKey(srcTopic));
assertEquals(1, producers.getProducers().get(srcTopic).size());
assertTrue(producers.getProducers().get(srcTopic).containsKey(ptnIdx));
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 13d8e62e4..df89eed8e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,8 +20,9 @@
package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
+
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientConfiguration;
+
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -46,7 +47,7 @@ public ThreadRuntimeFactory(String threadGroupName,
throws Exception {
this(
threadGroupName,
- pulsarServiceUrl != null ? PulsarClient.create(pulsarServiceUrl,
new ClientConfiguration()) : null,
+ pulsarServiceUrl != null ?
PulsarClient.builder().serviceUrl(pulsarServiceUrl).build() : null,
storageServiceUrl);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 0a4cdeb0b..1c045d48c 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -29,13 +29,13 @@
@Slf4j
public class FunctionAssignmentTailer
- implements java.util.function.Consumer<Message>, Function<Throwable,
Void>, AutoCloseable {
+ implements java.util.function.Consumer<Message<byte[]>>,
Function<Throwable, Void>, AutoCloseable {
private final FunctionRuntimeManager functionRuntimeManager;
- private final Reader reader;
+ private final Reader<byte[]> reader;
public FunctionAssignmentTailer(FunctionRuntimeManager
functionRuntimeManager,
- Reader reader)
+ Reader<byte[]> reader)
throws PulsarClientException {
this.functionRuntimeManager = functionRuntimeManager;
this.reader = reader;
@@ -64,7 +64,7 @@ public void close() {
}
@Override
- public void accept(Message msg) {
+ public void accept(Message<byte[]> msg) {
// check if latest
boolean hasMessageAvailable;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index b5f975154..29a322043 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -88,7 +88,7 @@ public void initialize() {
log.info("/** Initializing Function Metadata Manager **/");
try {
- Reader reader = pulsarClient.newReader()
+ Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
.create();
@@ -410,6 +410,6 @@ public void close() throws Exception {
}
private ServiceRequestManager getServiceRequestManager(PulsarClient
pulsarClient, String functionMetadataTopic) throws PulsarClientException {
- return new
ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
+ return new
ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
index a37315899..4bcaaf7e0 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java
@@ -29,13 +29,13 @@
@Slf4j
public class FunctionMetaDataTopicTailer
- implements java.util.function.Consumer<Message>, Function<Throwable,
Void>, AutoCloseable {
+ implements java.util.function.Consumer<Message<byte[]>>,
Function<Throwable, Void>, AutoCloseable {
private final FunctionMetaDataManager functionMetaDataManager;
- private final Reader reader;
+ private final Reader<byte[]> reader;
public FunctionMetaDataTopicTailer(FunctionMetaDataManager
functionMetaDataManager,
- Reader reader)
+ Reader<byte[]> reader)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
this.reader = reader;
@@ -62,7 +62,7 @@ public void close() {
log.info("Stopped function state consumer");
}
- public void processRequest(Message msg) {
+ public void processRequest(Message<byte[]> msg) {
ServiceRequest serviceRequest;
try {
@@ -80,7 +80,7 @@ public void processRequest(Message msg) {
}
@Override
- public void accept(Message msg) {
+ public void accept(Message<byte[]> msg) {
processRequest(msg);
// receive next request
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index f7e793a3a..af2399230 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -85,7 +85,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
MembershipManager membershipManager) throws
Exception {
this.workerConfig = workerConfig;
- Reader reader = pulsarClient.newReader()
+ Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionAssignmentTopic())
.startMessageId(MessageId.earliest)
.create();
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
index 21dd69ff3..2a3dc0f44 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java
@@ -22,12 +22,10 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import lombok.extern.slf4j.Slf4j;
/**
* A starter to start function worker.
*/
-@Slf4j
public class FunctionWorkerStarter {
private static class WorkerArguments {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 4e5fb2139..f43311d92 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.functions.worker;
+import com.google.common.annotations.VisibleForTesting;
+
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,16 +32,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,7 +57,7 @@
public class MembershipManager implements AutoCloseable, ConsumerEventListener
{
private final String consumerName;
- private final Consumer consumer;
+ private final Consumer<byte[]> consumer;
private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdminClient;
private final CompletableFuture<Void> firstConsumerEventFuture;
@@ -85,18 +86,17 @@
// we don't produce any messages into this topic, we only use the
`failover` subscription
// to elect an active consumer as the leader worker. The leader worker
will be responsible
// for scheduling snapshots for FMT and doing task assignment.
- consumer = client.subscribe(
- workerConfig.getClusterCoordinationTopic(),
- COORDINATION_TOPIC_SUBSCRIPTION,
- new ConsumerConfiguration()
- .setSubscriptionType(SubscriptionType.Failover)
- .setConsumerEventListener(this)
- .setProperty(WORKER_IDENTIFIER, consumerName)
- );
+ consumer = client.newConsumer()
+ .topic(workerConfig.getClusterCoordinationTopic())
+ .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
+ .subscriptionType(SubscriptionType.Failover)
+ .consumerEventListener(this)
+ .property(WORKER_IDENTIFIER, consumerName)
+ .subscribe();
}
@Override
- public void becameActive(Consumer consumer, int partitionId) {
+ public void becameActive(Consumer<?> consumer, int partitionId) {
firstConsumerEventFuture.complete(null);
if (isLeader.compareAndSet(false, true)) {
log.info("Worker {} became the leader.", consumerName);
@@ -104,7 +104,7 @@ public void becameActive(Consumer consumer, int
partitionId) {
}
@Override
- public void becameInactive(Consumer consumer, int partitionId) {
+ public void becameInactive(Consumer<?> consumer, int partitionId) {
firstConsumerEventFuture.complete(null);
if (isLeader.compareAndSet(true, false)) {
log.info("Worker {} lost the leadership.", consumerName);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c6b72a543..ed00958d7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,21 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.proto.Request;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.worker.scheduler.IScheduler;
-
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -47,6 +32,21 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.scheduler.IScheduler;
+
@Slf4j
public class SchedulerManager implements AutoCloseable {
@@ -63,7 +63,7 @@
private final IScheduler scheduler;
- private final Producer producer;
+ private final Producer<byte[]> producer;
private final ExecutorService executorService;
@@ -72,14 +72,10 @@ public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient) {
this.scheduler =
Reflections.createInstance(workerConfig.getSchedulerClassName(),
IScheduler.class,
Thread.currentThread().getContextClassLoader());
- ProducerConfiguration producerConf = new ProducerConfiguration()
- .setBatchingEnabled(true)
- .setBlockIfQueueFull(true)
- .setCompressionType(CompressionType.LZ4)
- // retry until succeed
- .setSendTimeout(0, TimeUnit.MILLISECONDS);
try {
- this.producer =
pulsarClient.createProducer(this.workerConfig.getFunctionAssignmentTopic(),
producerConf);
+ this.producer =
pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
+
.enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
+ sendTimeout(0, TimeUnit.MILLISECONDS).create();
} catch (PulsarClientException e) {
log.error("Failed to create producer to function assignment topic "
+ this.workerConfig.getFunctionAssignmentTopic(), e);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index 5a9f3393d..071e946ed 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -18,8 +18,18 @@
*/
package org.apache.pulsar.functions.worker;
-import dlshade.org.apache.zookeeper.KeeperException.Code;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.UUID;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
@@ -33,17 +43,7 @@
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.UUID;
+import dlshade.org.apache.zookeeper.KeeperException.Code;
@Slf4j
public final class Utils {
@@ -129,13 +129,14 @@ public static void downloadFromBookkeeper(Namespace
namespace,
OutputStream outputStream,
String packagePath) throws
IOException {
DistributedLogManager dlm = namespace.openLog(packagePath);
- InputStream in = new DLInputStream(dlm);
- int read = 0;
- byte[] bytes = new byte[1024];
- while ((read = in.read(bytes)) != -1) {
- outputStream.write(bytes, 0, read);
+ try (InputStream in = new DLInputStream(dlm)) {
+ int read = 0;
+ byte[] bytes = new byte[1024];
+ while ((read = in.read(bytes)) != -1) {
+ outputStream.write(bytes, 0, read);
+ }
+ outputStream.flush();
}
- outputStream.flush();
}
public static DistributedLogConfiguration getDlogConf(WorkerConfig
workerConfig) {
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index fb875b5ee..06f4e53dd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -20,12 +20,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import lombok.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
import lombok.experimental.Accessors;
@Data
@@ -92,7 +96,7 @@ public String getClusterCoordinationTopic() {
public String getFunctionAssignmentTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace,
functionAssignmentTopicName);
}
-
+
public static WorkerConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), WorkerConfig.class);
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5df3c3ff0..e33dcbaa4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -76,7 +76,7 @@ public void start(URI dlogUri) throws InterruptedException {
// initialize the function metadata manager
try {
- this.client =
PulsarClient.create(this.workerConfig.getPulsarServiceUrl());
+ this.client =
PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl()).build();
log.info("Created Pulsar client");
//create scheduler manager
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 3d27fb498..ac99f9a6a 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -19,17 +19,21 @@
package org.apache.pulsar.functions.worker;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
@@ -42,12 +46,24 @@
@Slf4j
public class FunctionMetaDataManagerTest {
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+
+ when(builder.create()).thenReturn(mock(Producer.class));
+
+ PulsarClient client = mock(PulsarClient.class);
+ when(client.newProducer()).thenReturn(builder);
+
+ return client;
+ }
+
@Test
public void testListFunctions() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(new WorkerConfig(),
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new
HashMap<>();
functionMetaDataMap1.put("func-1",
Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -87,7 +103,7 @@ public void updateFunction() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();
@@ -123,7 +139,7 @@ public boolean matches(Object o) {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Map<String, Function.FunctionMetaData> functionMetaDataMap = new
HashMap<>();
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -167,7 +183,7 @@ public void deregisterFunction() throws
PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
@@ -209,7 +225,7 @@ public void testProcessRequest() throws
PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
@@ -247,7 +263,7 @@ public void processUpdateTest() throws
PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
// worker has no record of function
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
@@ -275,7 +291,7 @@ public void processUpdateTest() throws
PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -324,7 +340,7 @@ public void processUpdateTest() throws
PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -361,7 +377,7 @@ public void processDeregister() throws
PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
// worker has no record of function
Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
@@ -388,7 +404,7 @@ public void processDeregister() throws
PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
@@ -415,7 +431,7 @@ public void processDeregister() throws
PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
- mock(PulsarClient.class)));
+ mockPulsarClient()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 14b56acd3..6dd3fa3c3 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -18,24 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.distributedlog.api.namespace.Namespace;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.functions.proto.Function;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
@@ -49,6 +31,28 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.proto.Function;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
public class MembershipManagerTest {
private final WorkerConfig workerConfig;
@@ -63,29 +67,33 @@ public MembershipManagerTest() {
@Test
public void testConsumerEventListener() throws Exception {
- PulsarClient mockClient = mock(PulsarClient.class);
- Consumer mockConsumer = mock(Consumer.class);
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+ Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
+
+
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
AtomicReference<ConsumerEventListener> listenerHolder = new
AtomicReference<>();
- when(mockClient.subscribe(
- eq(workerConfig.getClusterCoordinationTopic()),
- eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
- any(ConsumerConfiguration.class)
- )).thenAnswer(invocationOnMock -> {
+
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock
-> {
- ConsumerConfiguration conf = invocationOnMock.getArgumentAt(2,
ConsumerConfiguration.class);
- listenerHolder.set(conf.getConsumerEventListener());
+ ConsumerEventListener listener = invocationOnMock.getArgumentAt(0,
ConsumerEventListener.class);
+ listenerHolder.set(listener);
- return mockConsumer;
+ return mockConsumerBuilder;
});
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockClient));
assertFalse(membershipManager.isLeader());
verify(mockClient, times(1))
- .subscribe(
- eq(workerConfig.getClusterCoordinationTopic()),
- eq(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION),
- any(ConsumerConfiguration.class));
+ .newConsumer();
listenerHolder.get().becameActive(mockConsumer, 0);
assertTrue(membershipManager.isLeader());
@@ -94,11 +102,31 @@ public void testConsumerEventListener() throws Exception {
assertFalse(membershipManager.isLeader());
}
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
+
+ Consumer<byte[]> mockConsumer = mock(Consumer.class);
+ ConsumerBuilder<byte[]> mockConsumerBuilder =
mock(ConsumerBuilder.class);
+
+
when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder);
+
when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder);
+ when(mockConsumerBuilder.property(anyString(),
anyString())).thenReturn(mockConsumerBuilder);
+
+ when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
+
+
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder);
+
+ when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);
+
+ return mockClient;
+ }
+
@Test
public void testCheckFailuresNoFailures() throws Exception {
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -110,7 +138,7 @@ public void testCheckFailuresNoFailures() throws Exception {
mock(MembershipManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
@@ -162,7 +190,7 @@ public void testCheckFailuresSomeFailures() throws
Exception {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -175,7 +203,7 @@ public void testCheckFailuresSomeFailures() throws
Exception {
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
@@ -251,7 +279,7 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class);
- ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
+ ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
@@ -263,7 +291,7 @@ public void testCheckFailuresSomeUnassigned() throws
Exception {
mock(MembershipManager.class)
));
FunctionMetaDataManager functionMetaDataManager =
mock(FunctionMetaDataManager.class);
- MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mock(PulsarClient.class)));
+ MembershipManager membershipManager = spy(new
MembershipManager(workerConfig, mockPulsarClient()));
List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1",
"host-1", 8000));
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 64d693120..b5b4e981e 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.functions.worker;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +46,9 @@
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -61,6 +66,18 @@
private CompletableFuture<MessageId> completableFuture;
private Producer producer;
+ private static PulsarClient mockPulsarClient() throws
PulsarClientException {
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+
+ when(builder.create()).thenReturn(mock(Producer.class));
+
+ PulsarClient client = mock(PulsarClient.class);
+ when(client.newProducer()).thenReturn(builder);
+
+ return client;
+ }
+
@BeforeMethod
public void setup() throws PulsarClientException {
WorkerConfig workerConfig = new WorkerConfig();
@@ -78,8 +95,17 @@ public void setup() throws PulsarClientException {
byte[] bytes = any();
when(producer.sendAsync(bytes)).thenReturn(completableFuture);
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.sendTimeout(anyInt(),
any(TimeUnit.class))).thenReturn(builder);
+
+ when(builder.create()).thenReturn(producer);
+
PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
+ when(pulsarClient.newProducer()).thenReturn(builder);
schedulerManager = spy(new SchedulerManager(workerConfig,
pulsarClient));
functionRuntimeManager = mock(FunctionRuntimeManager.class);
@@ -380,9 +406,6 @@ public void testScalingUp() throws Exception {
// scale up
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Scaled =
Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version)
@@ -500,9 +523,6 @@ public void testScalingDown() throws Exception {
// scale down
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Scaled =
Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
.setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version)
@@ -612,9 +632,6 @@ public void testUpdate() throws Exception {
// scale down
- PulsarClient pulsarClient = mock(PulsarClient.class);
- doReturn(producer).when(pulsarClient).createProducer(any(), any());
-
Function.FunctionMetaData function2Updated =
Function.FunctionMetaData.newBuilder()
.setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2"))
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 406a2c815..cb21f6f8c 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import com.google.common.base.Splitter;
+
import java.io.IOException;
import java.util.Base64;
import java.util.List;
@@ -30,15 +31,19 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -63,10 +68,10 @@
public class ConsumerHandler extends AbstractWebSocketHandler {
private String subscription = null;
- private final ConsumerConfiguration conf;
+ private SubscriptionType subscriptionType;
private Consumer<byte[]> consumer;
- private final int maxPendingMessages;
+ private int maxPendingMessages;
private final AtomicInteger pendingMessages = new AtomicInteger();
private final LongAdder numMsgsDelivered;
@@ -78,20 +83,26 @@
public ConsumerHandler(WebSocketService service, HttpServletRequest
request, ServletUpgradeResponse response) {
super(service, request, response);
- this.conf = getConsumerConfiguration();
- this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 :
conf.getReceiverQueueSize();
+
+ ConsumerBuilderImpl<byte[]> builder;
+
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();
try {
+ builder = (ConsumerBuilderImpl<byte[]>)
getConsumerConfiguration(service.getPulsarClient());
+ this.maxPendingMessages =
(builder.getConf().getReceiverQueueSize() == 0) ? 1
+ : builder.getConf().getReceiverQueueSize();
+ this.subscriptionType = builder.getConf().getSubscriptionType();
+
// checkAuth() should be called after assigning a value to
this.subscription
this.subscription = extractSubscription(request);
if (!checkAuth(response)) {
return;
}
- this.consumer =
service.getPulsarClient().subscribe(topic.toString(), subscription, conf);
+ this.consumer =
builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic
{}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -242,7 +253,7 @@ public void close() throws IOException {
}
}
- public Consumer getConsumer() {
+ public Consumer<byte[]> getConsumer() {
return this.consumer;
}
@@ -251,7 +262,7 @@ public String getSubscription() {
}
public SubscriptionType getSubscriptionType() {
- return conf.getSubscriptionType();
+ return subscriptionType;
}
public long getAndResetNumMsgsDelivered() {
@@ -267,7 +278,7 @@ public long getAndResetNumMsgsAcked() {
}
public long getMsgDeliveredCounter() {
- return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+ return msgDeliveredCounter;
}
protected void updateDeliverMsgStat(long msgSize) {
@@ -276,32 +287,32 @@ protected void updateDeliverMsgStat(long msgSize) {
numBytesDelivered.add(msgSize);
}
- private ConsumerConfiguration getConsumerConfiguration() {
- ConsumerConfiguration conf = new ConsumerConfiguration();
+ private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient
client) {
+ ConsumerBuilder<byte[]> builder = client.newConsumer();
if (queryParams.containsKey("ackTimeoutMillis")) {
-
conf.setAckTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")),
TimeUnit.MILLISECONDS);
+
builder.ackTimeout(Integer.parseInt(queryParams.get("ackTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("subscriptionType")) {
checkArgument(Enums.getIfPresent(SubscriptionType.class,
queryParams.get("subscriptionType")).isPresent(),
"Invalid subscriptionType %s",
queryParams.get("subscriptionType"));
-
conf.setSubscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
+
builder.subscriptionType(SubscriptionType.valueOf(queryParams.get("subscriptionType")));
}
if (queryParams.containsKey("receiverQueueSize")) {
-
conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
1000));
+
builder.receiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")),
1000));
}
if (queryParams.containsKey("consumerName")) {
- conf.setConsumerName(queryParams.get("consumerName"));
+ builder.consumerName(queryParams.get("consumerName"));
}
if (queryParams.containsKey("priorityLevel")) {
-
conf.setPriorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
+
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}
- return conf;
+ return builder;
}
@Override
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ed8adfcb..80545bff9 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -24,6 +24,9 @@
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Enums;
+
import java.io.IOException;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
@@ -35,12 +38,13 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
@@ -52,9 +56,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Enums;
-
/**
* Websocket end-point url handler to handle incoming message coming from
client. Websocket end-point url handler to
@@ -91,8 +92,7 @@ public ProducerHandler(WebSocketService service,
HttpServletRequest request, Ser
}
try {
- ProducerConfiguration conf = getProducerConfiguration();
- this.producer =
service.getPulsarClient().createProducer(topic.toString(), conf);
+ this.producer =
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic
{}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -166,7 +166,7 @@ public void onWebSocketText(String message) {
}
final long msgSize = rawPayload.length;
- MessageBuilder builder =
MessageBuilder.create().setContent(rawPayload);
+ MessageBuilder<byte[]> builder =
MessageBuilder.create().setContent(rawPayload);
if (sendRequest.properties != null) {
builder.setProperties(sendRequest.properties);
@@ -196,7 +196,7 @@ public void onWebSocketText(String message) {
});
}
- public Producer getProducer() {
+ public Producer<byte[]> getProducer() {
return this.producer;
}
@@ -222,7 +222,7 @@ public StatsBuckets getPublishLatencyStatsUSec() {
}
public long getMsgPublishedCounter() {
- return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
+ return msgPublishedCounter;
}
@Override
@@ -248,45 +248,44 @@ private void updateSentMsgStats(long msgSize, long
latencyUsec) {
MSG_PUBLISHED_COUNTER_UPDATER.getAndIncrement(this);
}
- private ProducerConfiguration getProducerConfiguration() {
- ProducerConfiguration conf = new ProducerConfiguration();
-
- conf.setBatchingEnabled(false);
- conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
+ private ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client) {
+ ProducerBuilder<byte[]> builder = client.newProducer()
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition);
// Set to false to prevent the server thread from being blocked if a
lot of messages are pending.
- conf.setBlockIfQueueFull(false);
+ builder.blockIfQueueFull(false);
if (queryParams.containsKey("producerName")) {
- conf.setProducerName(queryParams.get("producerName"));
+ builder.producerName(queryParams.get("producerName"));
}
if (queryParams.containsKey("initialSequenceId")) {
- conf.setInitialSequenceId(Long.parseLong("initialSequenceId"));
+ builder.initialSequenceId(Long.parseLong("initialSequenceId"));
}
if (queryParams.containsKey("hashingScheme")) {
-
conf.setHashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
+
builder.hashingScheme(HashingScheme.valueOf(queryParams.get("hashingScheme")));
}
if (queryParams.containsKey("sendTimeoutMillis")) {
-
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
+
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
if (queryParams.containsKey("batchingEnabled")) {
-
conf.setBatchingEnabled(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
}
if (queryParams.containsKey("batchingMaxMessages")) {
-
conf.setBatchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
}
if (queryParams.containsKey("maxPendingMessages")) {
-
conf.setMaxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
}
if (queryParams.containsKey("batchingMaxPublishDelay")) {
-
conf.setBatchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
TimeUnit.MILLISECONDS);
}
@@ -296,17 +295,17 @@ private ProducerConfiguration getProducerConfiguration() {
"Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
- conf.setMessageRoutingMode(routingMode);
+ builder.messageRoutingMode(routingMode);
}
}
if (queryParams.containsKey("compressionType")) {
checkArgument(Enums.getIfPresent(CompressionType.class,
queryParams.get("compressionType")).isPresent(),
"Invalid compressionType %s",
queryParams.get("compressionType"));
-
conf.setCompressionType(CompressionType.valueOf(queryParams.get("compressionType")));
+
builder.compressionType(CompressionType.valueOf(queryParams.get("compressionType")));
}
- return conf;
+ return builder;
}
private static final Logger log =
LoggerFactory.getLogger(ProducerHandler.class);
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54037fa85..82d7cff5e 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -94,7 +94,7 @@ public ReaderHandler(WebSocketService service,
HttpServletRequest request, Servl
this.reader = builder.create();
- this.subscription =
((ReaderImpl)this.reader).getConsumer().getSubscription();
+ this.subscription = ((ReaderImpl<?>)
this.reader).getConsumer().getSubscription();
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}",
request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -214,8 +214,8 @@ public void close() throws IOException {
}
}
- public Consumer getConsumer() {
- return reader != null ? ((ReaderImpl)reader).getConsumer() : null;
+ public Consumer<?> getConsumer() {
+ return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
}
public String getSubscription() {
@@ -235,7 +235,7 @@ public long getAndResetNumBytesDelivered() {
}
public long getMsgDeliveredCounter() {
- return MSG_DELIVERED_COUNTER_UPDATER.get(this);
+ return msgDeliveredCounter;
}
protected void updateDeliverMsgStat(long msgSize) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services