sijie closed pull request #1845: Functions schema integration
URL: https://github.com/apache/incubator-pulsar/pull/1845
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4be8f58a8e..b392629b36 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -117,8 +118,11 @@
this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s ->
Optional.of(TopicName.get(s).getNamespaceObject())).get();
- List<CompletableFuture<Void>> futures =
conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+ List<CompletableFuture<Void>> futures =
+ conf.getTopicNames().stream()
+ .map(this::subscribeAsync)
.collect(Collectors.toList());
+
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
@@ -127,7 +131,7 @@
}
setState(State.Ready);
// We have successfully created N consumers, so we can
start receiving messages now
-
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+ startReceivingMessages(new
ArrayList<>(consumers.values()));
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
log.info("[{}] [{}] Created topics consumer with {}
sub-consumers",
topic, subscription, allTopicPartitionsNumber.get());
diff --git a/pulsar-functions/api-java/pom.xml
b/pulsar-functions/api-java/pom.xml
index 43abbe3079..e9fdae85fd 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -37,6 +37,12 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
index f9efa3d8f8..691f3de117 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
@@ -18,10 +18,32 @@
*/
package org.apache.pulsar.functions.api;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
+
/**
* An interface for serializer/deserializer.
*/
-public interface SerDe<T> {
+public interface SerDe<T> extends Schema<T> {
T deserialize(byte[] input);
+
byte[] serialize(T input);
+
+ @Override
+ default SchemaInfo getSchemaInfo() {
+ SchemaInfo info = new SchemaInfo();
+ info.setType(SchemaType.NONE);
+ return info;
+ }
+
+ @Override
+ default byte[] encode(T message) {
+ return serialize(message);
+ }
+
+ @Override
+ default T decode(byte[] bytes) {
+ return deserialize(bytes);
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 9eddc690e4..b606402086 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -121,6 +121,15 @@ public ContextImpl(InstanceConfig config, Logger logger,
PulsarClient client,
}
}
+ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
+ ClassLoader classLoader) {
+ this(config, logger, client, classLoader, null);
+ }
+
+ public void setInputConsumer(Consumer inputConsumer) {
+ this.inputConsumer = inputConsumer;
+ }
+
public void setCurrentMessageContext(MessageId messageId, String
topicName) {
this.messageId = messageId;
this.currentTopicName = topicName;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 5ab8d8548c..ec85261530 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,16 +18,19 @@
*/
package org.apache.pulsar.functions.instance;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.swing.text.html.Option;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.io.core.Source;
-
+import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,11 +41,12 @@
*/
@Slf4j
public class JavaInstance implements AutoCloseable {
+ private ContextImpl context;
@Getter(AccessLevel.PACKAGE)
- private final ContextImpl context;
private Function function;
private java.util.function.Function javaUtilFunction;
+ private Optional<PulsarSource> optionalPulsarSource = Optional.empty();
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
@@ -52,8 +56,8 @@ public JavaInstance(InstanceConfig config, Object
userClassObject,
Logger instanceLog = LoggerFactory.getLogger("function-" +
config.getFunctionDetails().getName());
if (source instanceof PulsarSource) {
- this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader,
- ((PulsarSource) source).getInputConsumer());
+ this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader);
+ optionalPulsarSource = Optional.of((PulsarSource) source);
} else {
this.context = null;
}
@@ -64,13 +68,17 @@ public JavaInstance(InstanceConfig config, Object
userClassObject,
} else {
this.javaUtilFunction = (java.util.function.Function)
userClassObject;
}
+
}
public JavaExecutionResult handleMessage(MessageId messageId, String
topicName, Object input) {
- if (context != null) {
- context.setCurrentMessageContext(messageId, topicName);
- }
+ optionalPulsarSource.ifPresent((pulsarSource) -> {
+
this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
+ this.context.setCurrentMessageContext(messageId, topicName);
+ });
+
JavaExecutionResult executionResult = new JavaExecutionResult();
+
try {
Object output;
if (function != null) {
@@ -85,11 +93,15 @@ public JavaExecutionResult handleMessage(MessageId
messageId, String topicName,
return executionResult;
}
+ public ContextImpl getContext() {
+ return this.context;
+ }
+
@Override
public void close() {
}
- public InstanceCommunication.MetricsData getAndResetMetrics() {
+ public MetricsData getAndResetMetrics() {
return context.getAndResetMetrics();
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index 7a561a17d6..73297c7b8d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -27,23 +27,27 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
-public abstract class AbstractOneOuputTopicProducers implements Producers {
+public abstract class AbstractOneOuputTopicProducers<T> implements
Producers<T> {
protected final PulsarClient client;
protected final String outputTopic;
+ protected final Schema<T> schema;
AbstractOneOuputTopicProducers(PulsarClient client,
- String outputTopic)
+ String outputTopic,
+ Schema<T> schema)
throws PulsarClientException {
this.client = client;
this.outputTopic = outputTopic;
+ this.schema = schema;
}
- static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
+ static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client,
Schema<U> schema) {
// use function result router to deal with different processing
guarantees.
- return client.newProducer() //
+ return client.newProducer(schema) //
.blockIfQueueFull(true) //
.enableBatching(true) //
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
@@ -53,23 +57,23 @@
.messageRouter(FunctionResultRouter.of());
}
- protected Producer<byte[]> createProducer(String topic)
+ protected Producer<T> createProducer(String topic, Schema<T> schema)
throws PulsarClientException {
- return createProducer(client, topic);
+ return createProducer(client, topic, schema);
}
- public static Producer<byte[]> createProducer(PulsarClient client, String
topic)
+ public static <T> Producer<T> createProducer(PulsarClient client, String
topic, Schema<T> schema)
throws PulsarClientException {
- return newProducerBuilder(client).topic(topic).create();
+ return newProducerBuilder(client, schema).topic(topic).create();
}
- protected Producer<byte[]> createProducer(String topic, String
producerName)
+ protected Producer<T> createProducer(String topic, String producerName,
Schema<T> schema)
throws PulsarClientException {
- return createProducer(client, topic, producerName);
+ return createProducer(client, topic, producerName, schema);
}
- public static Producer<byte[]> createProducer(PulsarClient client, String
topic, String producerName)
+ public static <T> Producer<T> createProducer(PulsarClient client, String
topic, String producerName, Schema<T> schema)
throws PulsarClientException {
- return
newProducerBuilder(client).topic(topic).producerName(producerName).create();
+ return newProducerBuilder(client,
schema).topic(topic).producerName(producerName).create();
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 12a639e574..48a8b2633f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.functions.instance.producers;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -31,19 +30,21 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
@Slf4j
-public class MultiConsumersOneOuputTopicProducers extends
AbstractOneOuputTopicProducers {
+public class MultiConsumersOneOuputTopicProducers<T> extends
AbstractOneOuputTopicProducers<T> {
@Getter(AccessLevel.PACKAGE)
// PartitionId -> producer
- private final Map<String, Producer<byte[]>> producers;
+ private final Map<String, Producer<T>> producers;
public MultiConsumersOneOuputTopicProducers(PulsarClient client,
- String outputTopic)
+ String outputTopic,
+ Schema<T> schema)
throws PulsarClientException {
- super(client, outputTopic);
+ super(client, outputTopic, schema);
this.producers = new ConcurrentHashMap<>();
}
@@ -57,10 +58,10 @@ static String makeProducerName(String srcTopicName, String
srcTopicPartition) {
}
@Override
- public synchronized Producer<byte[]> getProducer(String srcPartitionId)
throws PulsarClientException {
- Producer<byte[]> producer = producers.get(srcPartitionId);
+ public synchronized Producer<T> getProducer(String srcPartitionId) throws
PulsarClientException {
+ Producer<T> producer = producers.get(srcPartitionId);
if (null == producer) {
- producer = createProducer(outputTopic, srcPartitionId);
+ producer = createProducer(outputTopic, srcPartitionId, schema);
producers.put(srcPartitionId, producer);
}
return producer;
@@ -68,7 +69,7 @@ static String makeProducerName(String srcTopicName, String
srcTopicPartition) {
@Override
public synchronized void closeProducer(String srcPartitionId) {
- Producer<byte[]> producer = producers.get(srcPartitionId);
+ Producer<T> producer = producers.get(srcPartitionId);
if (null != producer) {
producer.closeAsync();
producers.remove(srcPartitionId);
@@ -78,7 +79,7 @@ public synchronized void closeProducer(String srcPartitionId)
{
@Override
public synchronized void close() {
List<CompletableFuture<Void>> closeFutures = new
ArrayList<>(producers.size());
- for (Producer<byte[]> producer: producers.values()) {
+ for (Producer<T> producer: producers.values()) {
closeFutures.add(producer.closeAsync());
}
try {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 4d026ee4dd..7892876c1b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -24,7 +24,7 @@
/**
* An interface for managing publishers within a java instance.
*/
-public interface Producers extends AutoCloseable {
+public interface Producers<T> extends AutoCloseable {
/**
* Initialize all the producers.
@@ -40,7 +40,7 @@
* src partition Id
* @return the producer instance to produce messages
*/
- Producer<byte[]> getProducer(String srcPartitionId) throws
PulsarClientException;
+ Producer<T> getProducer(String srcPartitionId) throws
PulsarClientException;
/**
* Close a producer specified by <tt>srcPartitionId</tt>.
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 0356ab1d94..60a1589cb5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -54,28 +54,28 @@
private PulsarSinkProcessor pulsarSinkProcessor;
- private interface PulsarSinkProcessor {
+ private interface PulsarSinkProcessor<T> {
void initializeOutputProducer(String outputTopic) throws Exception;
- void sendOutputMessage(MessageBuilder outputMsgBuilder,
+ void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
RecordContext recordContext) throws Exception;
void close() throws Exception;
}
- private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor
{
- private Producer<byte[]> producer;
+ private class PulsarSinkAtMostOnceProcessor implements
PulsarSinkProcessor<T> {
+ private Producer<T> producer;
@Override
public void initializeOutputProducer(String outputTopic) throws
Exception {
this.producer = AbstractOneOuputTopicProducers.createProducer(
- client, pulsarSinkConfig.getTopic());
+ client, pulsarSinkConfig.getTopic(), outputSerDe);
}
@Override
- public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+ public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
RecordContext recordContext) throws
Exception {
- Message<byte[]> outputMsg = outputMsgBuilder.build();
+ Message<T> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg);
}
@@ -91,19 +91,19 @@ public void close() throws Exception {
}
}
- private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor {
- private Producer<byte[]> producer;
+ private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor<T> {
+ private Producer<T> producer;
@Override
public void initializeOutputProducer(String outputTopic) throws
Exception {
this.producer = AbstractOneOuputTopicProducers.createProducer(
- client, pulsarSinkConfig.getTopic());
+ client, pulsarSinkConfig.getTopic(), outputSerDe);
}
@Override
- public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+ public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
RecordContext recordContext) throws
Exception {
- Message<byte[]> outputMsg = outputMsgBuilder.build();
+ Message<T> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg).thenAccept(messageId ->
recordContext.ack());
}
@@ -119,19 +119,19 @@ public void close() throws Exception {
}
}
- private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor, ConsumerEventListener {
+ private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor<T>, ConsumerEventListener {
@Getter(AccessLevel.PACKAGE)
- protected Producers outputProducer;
+ protected Producers<T> outputProducer;
@Override
public void initializeOutputProducer(String outputTopic) throws
Exception {
- outputProducer = new MultiConsumersOneOuputTopicProducers(client,
outputTopic);
+ outputProducer = new MultiConsumersOneOuputTopicProducers(client,
outputTopic, outputSerDe);
outputProducer.initialize();
}
@Override
- public void sendOutputMessage(MessageBuilder outputMsgBuilder,
RecordContext recordContext)
+ public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder,
RecordContext recordContext)
throws Exception {
// assign sequence id to output message for idempotent producing
@@ -139,9 +139,9 @@ public void sendOutputMessage(MessageBuilder
outputMsgBuilder, RecordContext rec
.setSequenceId(recordContext.getRecordSequence());
// currently on PulsarRecord
- Producer producer =
outputProducer.getProducer(recordContext.getPartitionId());
+ Producer<T> producer =
outputProducer.getProducer(recordContext.getPartitionId());
- org.apache.pulsar.client.api.Message outputMsg =
outputMsgBuilder.build();
+ org.apache.pulsar.client.api.Message<T> outputMsg =
outputMsgBuilder.build();
producer.sendAsync(outputMsg)
.thenAccept(messageId -> recordContext.ack())
.join();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 74f23663ef..117f16efab 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -19,40 +19,42 @@
package org.apache.pulsar.functions.source;
import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
-import org.apache.pulsar.io.core.Source;
import org.jboss.util.Classes;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
@Slf4j
-public class PulsarSource<T> implements Source<T> {
+public class PulsarSource<T> extends PushSource<T> implements
MessageListener<T> {
private PulsarClient pulsarClient;
private PulsarSourceConfig pulsarSourceConfig;
- private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
private boolean isTopicsPattern;
+ private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>();
- @Getter
- private org.apache.pulsar.client.api.Consumer inputConsumer;
+ private Map<String, org.apache.pulsar.client.api.Consumer<T>>
inputConsumers;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig
pulsarConfig) {
this.pulsarClient = pulsarClient;
@@ -64,28 +66,31 @@ public void open(Map<String, Object> config) throws
Exception {
// Setup Serialization/Deserialization
setupSerDe();
- // Setup pulsar consumer
- ConsumerBuilder<byte[]> consumerBuilder =
this.pulsarClient.newConsumer()
+ inputConsumers = Maps.newHashMap();
+ for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) {
+ ConsumerBuilder<T> consumerBuilder =
this.pulsarClient.newConsumer(entry.getValue())
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
-
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
+
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType())
+ .messageListener(this);
- if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
- isTopicsPattern = true;
- }else {
- consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
- }
-
- if (pulsarSourceConfig.getTimeoutMs() != null) {
- consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
+ if (pulsarSourceConfig.getTimeoutMs() != null) {
+ consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
+ }
+
+ if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
+
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
+ isTopicsPattern = true;
+ }else {
+ consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
+ }
+
+ inputConsumers.put(entry.getKey(),consumerBuilder.subscribe());
}
- this.inputConsumer = consumerBuilder.subscribe();
+
}
@Override
- public Record<T> read() throws Exception {
- org.apache.pulsar.client.api.Message<T> message =
this.inputConsumer.receive();
-
+ public void received(Consumer<T> consumer, Message<T> message) {
String topicName;
String partitionId;
@@ -127,31 +132,44 @@ public void open(Map<String, Object> config) throws
Exception {
}
PulsarRecord<T> pulsarMessage = (PulsarRecord<T>)
PulsarRecord.builder()
- .value(input)
- .messageId(message.getMessageId())
- .partitionId(String.format("%s-%s", topicName, partitionId))
- .recordSequence(Utils.getSequenceId(message.getMessageId()))
- .topicName(topicName)
- .ackFunction(() -> {
- if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- inputConsumer.acknowledgeCumulativeAsync(message);
- } else {
- inputConsumer.acknowledgeAsync(message);
- }
- }).failFunction(() -> {
- if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new RuntimeException("Failed to process message:
" + message.getMessageId());
- }
- })
- .build();
- return pulsarMessage;
+ .value(input)
+ .messageId(message.getMessageId())
+ .partitionId(String.format("%s-%s", topicName, partitionId))
+ .recordSequence(Utils.getSequenceId(message.getMessageId()))
+ .topicName(topicName)
+ .ackFunction(() -> {
+ if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ consumer.acknowledgeCumulativeAsync(message);
+ } else {
+ consumer.acknowledgeAsync(message);
+ }
+ }).failFunction(() -> {
+ if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Failed to process message: " +
message.getMessageId());
+ }
+ })
+ .build();
+
+ consume(pulsarMessage);
+ }
+
+ public Consumer<T> getConsumerForTopic(String topic) {
+ return inputConsumers.get(topic);
+ }
+
+ @Override
+ public void reachedEndOfTopic(Consumer<T> consumer) {
+ //No-op
}
@Override
public void close() throws Exception {
- if (this.inputConsumer != null) {
- this.inputConsumer.close();
- }
+ inputConsumers.forEach((ignored, consumer) -> {
+ try {
+ consumer.close();
+ } catch (PulsarClientException e) {
+ }
+ });
}
@VisibleForTesting
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index b2f2c4f8a9..81cbbda1d2 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.instance.producers;
import static
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -43,6 +45,7 @@
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -191,10 +194,10 @@
public void setup() throws Exception {
this.mockClient = mock(PulsarClient.class);
- when(mockClient.newProducer())
+ when(mockClient.newProducer(any(Schema.class)))
.thenReturn(new MockProducerBuilder());
- producers = new MultiConsumersOneOuputTopicProducers(mockClient,
TEST_OUTPUT_TOPIC);
+ producers = new MultiConsumersOneOuputTopicProducers(mockClient,
TEST_OUTPUT_TOPIC, Schema.BYTES);
producers.initialize();
}
@@ -219,13 +222,13 @@ public void testGetCloseProducer() throws Exception {
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .newProducer();
+ .newProducer(Schema.BYTES);
assertTrue(producers.getProducers().containsKey(producerName));
// second get will not create a new producer
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .newProducer();
+ .newProducer(Schema.BYTES);
assertTrue(producers.getProducers().containsKey(producerName));
// close
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 3c5e61b0de..8ca94bb169 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -34,9 +34,11 @@
import java.util.HashMap;
import java.util.Map;
+import static java.util.Collections.emptyMap;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -78,9 +80,10 @@ private static PulsarClient getPulsarClient() throws
PulsarClientException {
doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(),
any());
+
doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject());
Consumer consumer = mock(Consumer.class);
doReturn(consumer).when(consumerBuilder).subscribe();
- doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+ doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject());
return pulsarClient;
}
@@ -168,7 +171,7 @@ public void testDefaultSerDe() throws Exception {
pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
PulsarSource pulsarSource = new PulsarSource(getPulsarClient(),
pulsarConfig);
- pulsarSource.open(new HashMap<>());
+ pulsarSource.open(emptyMap());
}
/**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services