This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4c5c3cf Revert "Functions schema integration (#1845)" (#2018) 4c5c3cf is described below commit 4c5c3cf2bd00801e0e19f2cf5fa09e2a82a1cf8f Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Jun 21 17:28:40 2018 -0700 Revert "Functions schema integration (#1845)" (#2018) This reverts commit 97b56cf04dfca81c937a4b640bc8cfc5470d66cb. --- .../client/impl/MultiTopicsConsumerImpl.java | 8 +- pulsar-functions/api-java/pom.xml | 6 -- .../org/apache/pulsar/functions/api/SerDe.java | 28 +---- .../pulsar/functions/instance/ContextImpl.java | 9 -- .../pulsar/functions/instance/JavaInstance.java | 32 ++---- .../producers/AbstractOneOuputTopicProducers.java | 28 +++-- .../MultiConsumersOneOuputTopicProducers.java | 21 ++-- .../functions/instance/producers/Producers.java | 4 +- .../apache/pulsar/functions/sink/PulsarSink.java | 36 +++---- .../pulsar/functions/source/PulsarSource.java | 115 +++++++++------------ .../MultiConsumersOneOutputTopicProducersTest.java | 11 +- .../pulsar/functions/source/PulsarSourceTest.java | 7 +- 12 files changed, 110 insertions(+), 195 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index b392629..4be8f58 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -118,11 +117,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { this.namespaceName = conf.getTopicNames().stream().findFirst() .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get(); - List<CompletableFuture<Void>> futures = - conf.getTopicNames().stream() - .map(this::subscribeAsync) + List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t)) .collect(Collectors.toList()); - FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { try { @@ -131,7 +127,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } setState(State.Ready); // We have successfully created N consumers, so we can start receiving messages now - startReceivingMessages(new ArrayList<>(consumers.values())); + startReceivingMessages(consumers.values().stream().collect(Collectors.toList())); subscribeFuture().complete(MultiTopicsConsumerImpl.this); log.info("[{}] [{}] Created topics consumer with {} sub-consumers", topic, subscription, allTopicPartitionsNumber.get()); diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index d7babec..43abbe3 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -38,12 +38,6 @@ </dependency> <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-original</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>net.jodah</groupId> <artifactId>typetools</artifactId> <scope>test</scope> diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java index 2caee16..f9efa3d 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java @@ -18,36 +18,10 @@ */ package org.apache.pulsar.functions.api; -import java.util.Collections; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.schema.SchemaType; - /** * An interface for serializer/deserializer. */ -public interface SerDe<T> extends Schema<T> { +public interface SerDe<T> { T deserialize(byte[] input); - byte[] serialize(T input); - - @Override - default SchemaInfo getSchemaInfo() { - SchemaInfo info = new SchemaInfo(); - info.setName(""); - info.setType(SchemaType.NONE); - info.setSchema(new byte[0]); - info.setProperties(Collections.emptyMap()); - return info; - } - - @Override - default byte[] encode(T message) { - return serialize(message); - } - - @Override - default T decode(byte[] bytes) { - return deserialize(bytes); - } } \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index fce440d..5ca07d9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -124,15 +124,6 @@ class ContextImpl implements Context { } } - public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, - ClassLoader classLoader) { - this(config, logger, client, classLoader, null); - } - - public void setInputConsumer(Consumer inputConsumer) { - this.inputConsumer = inputConsumer; - } - public void setCurrentMessageContext(MessageId messageId, String topicName) { this.messageId = messageId; this.currentTopicName = topicName; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index ec85261..5ab8d85 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -18,19 +18,16 @@ */ package org.apache.pulsar.functions.instance; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import javax.swing.text.html.Option; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; -import org.apache.pulsar.functions.source.PulsarSource; +import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.io.core.Source; + +import org.apache.pulsar.functions.source.PulsarSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +38,11 @@ import org.slf4j.LoggerFactory; */ @Slf4j public class JavaInstance implements AutoCloseable { - private ContextImpl context; @Getter(AccessLevel.PACKAGE) + private final ContextImpl context; private Function function; private java.util.function.Function javaUtilFunction; - private Optional<PulsarSource> optionalPulsarSource = Optional.empty(); public JavaInstance(InstanceConfig config, Object userClassObject, ClassLoader clsLoader, @@ -56,8 +52,8 @@ public class JavaInstance implements AutoCloseable { Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); if (source instanceof PulsarSource) { - this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader); - optionalPulsarSource = Optional.of((PulsarSource) source); + this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, + ((PulsarSource) source).getInputConsumer()); } else { this.context = null; } @@ -68,17 +64,13 @@ public class JavaInstance implements AutoCloseable { } else { this.javaUtilFunction = (java.util.function.Function) userClassObject; } - } public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) { - optionalPulsarSource.ifPresent((pulsarSource) -> { - this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName)); - this.context.setCurrentMessageContext(messageId, topicName); - }); - + if (context != null) { + context.setCurrentMessageContext(messageId, topicName); + } JavaExecutionResult executionResult = new JavaExecutionResult(); - try { Object output; if (function != null) { @@ -93,15 +85,11 @@ public class JavaInstance implements AutoCloseable { return executionResult; } - public ContextImpl getContext() { - return this.context; - } - @Override public void close() { } - public MetricsData getAndResetMetrics() { + public InstanceCommunication.MetricsData getAndResetMetrics() { return context.getAndResetMetrics(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java index 73297c7..7a561a1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java @@ -27,27 +27,23 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.functions.instance.FunctionResultRouter; -public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> { +public abstract class AbstractOneOuputTopicProducers implements Producers { protected final PulsarClient client; protected final String outputTopic; - protected final Schema<T> schema; AbstractOneOuputTopicProducers(PulsarClient client, - String outputTopic, - Schema<T> schema) + String outputTopic) throws PulsarClientException { this.client = client; this.outputTopic = outputTopic; - this.schema = schema; } - static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, Schema<U> schema) { + static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) { // use function result router to deal with different processing guarantees. - return client.newProducer(schema) // + return client.newProducer() // .blockIfQueueFull(true) // .enableBatching(true) // .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // @@ -57,23 +53,23 @@ public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> .messageRouter(FunctionResultRouter.of()); } - protected Producer<T> createProducer(String topic, Schema<T> schema) + protected Producer<byte[]> createProducer(String topic) throws PulsarClientException { - return createProducer(client, topic, schema); + return createProducer(client, topic); } - public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema) + public static Producer<byte[]> createProducer(PulsarClient client, String topic) throws PulsarClientException { - return newProducerBuilder(client, schema).topic(topic).create(); + return newProducerBuilder(client).topic(topic).create(); } - protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema) + protected Producer<byte[]> createProducer(String topic, String producerName) throws PulsarClientException { - return createProducer(client, topic, producerName, schema); + return createProducer(client, topic, producerName); } - public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema) + public static Producer<byte[]> createProducer(PulsarClient client, String topic, String producerName) throws PulsarClientException { - return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create(); + return newProducerBuilder(client).topic(topic).producerName(producerName).create(); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java index 48a8b26..12a639e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.instance.producers; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -30,21 +31,19 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; @Slf4j -public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> { +public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers { @Getter(AccessLevel.PACKAGE) // PartitionId -> producer - private final Map<String, Producer<T>> producers; + private final Map<String, Producer<byte[]>> producers; public MultiConsumersOneOuputTopicProducers(PulsarClient client, - String outputTopic, - Schema<T> schema) + String outputTopic) throws PulsarClientException { - super(client, outputTopic, schema); + super(client, outputTopic); this.producers = new ConcurrentHashMap<>(); } @@ -58,10 +57,10 @@ public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTop } @Override - public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException { - Producer<T> producer = producers.get(srcPartitionId); + public synchronized Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException { + Producer<byte[]> producer = producers.get(srcPartitionId); if (null == producer) { - producer = createProducer(outputTopic, srcPartitionId, schema); + producer = createProducer(outputTopic, srcPartitionId); producers.put(srcPartitionId, producer); } return producer; @@ -69,7 +68,7 @@ public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTop @Override public synchronized void closeProducer(String srcPartitionId) { - Producer<T> producer = producers.get(srcPartitionId); + Producer<byte[]> producer = producers.get(srcPartitionId); if (null != producer) { producer.closeAsync(); producers.remove(srcPartitionId); @@ -79,7 +78,7 @@ public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTop @Override public synchronized void close() { List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size()); - for (Producer<T> producer: producers.values()) { + for (Producer<byte[]> producer: producers.values()) { closeFutures.add(producer.closeAsync()); } try { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java index 7892876..4d026ee 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java @@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException; /** * An interface for managing publishers within a java instance. */ -public interface Producers<T> extends AutoCloseable { +public interface Producers extends AutoCloseable { /** * Initialize all the producers. @@ -40,7 +40,7 @@ public interface Producers<T> extends AutoCloseable { * src partition Id * @return the producer instance to produce messages */ - Producer<T> getProducer(String srcPartitionId) throws PulsarClientException; + Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException; /** * Close a producer specified by <tt>srcPartitionId</tt>. diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 60a1589..0356ab1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -54,28 +54,28 @@ public class PulsarSink<T> implements Sink<T> { private PulsarSinkProcessor pulsarSinkProcessor; - private interface PulsarSinkProcessor<T> { + private interface PulsarSinkProcessor { void initializeOutputProducer(String outputTopic) throws Exception; - void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, + void sendOutputMessage(MessageBuilder outputMsgBuilder, RecordContext recordContext) throws Exception; void close() throws Exception; } - private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> { - private Producer<T> producer; + private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor { + private Producer<byte[]> producer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic(), outputSerDe); + client, pulsarSinkConfig.getTopic()); } @Override - public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, + public void sendOutputMessage(MessageBuilder outputMsgBuilder, RecordContext recordContext) throws Exception { - Message<T> outputMsg = outputMsgBuilder.build(); + Message<byte[]> outputMsg = outputMsgBuilder.build(); this.producer.sendAsync(outputMsg); } @@ -91,19 +91,19 @@ public class PulsarSink<T> implements Sink<T> { } } - private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> { - private Producer<T> producer; + private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor { + private Producer<byte[]> producer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { this.producer = AbstractOneOuputTopicProducers.createProducer( - client, pulsarSinkConfig.getTopic(), outputSerDe); + client, pulsarSinkConfig.getTopic()); } @Override - public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, + public void sendOutputMessage(MessageBuilder outputMsgBuilder, RecordContext recordContext) throws Exception { - Message<T> outputMsg = outputMsgBuilder.build(); + Message<byte[]> outputMsg = outputMsgBuilder.build(); this.producer.sendAsync(outputMsg).thenAccept(messageId -> recordContext.ack()); } @@ -119,19 +119,19 @@ public class PulsarSink<T> implements Sink<T> { } } - private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<T>, ConsumerEventListener { + private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor, ConsumerEventListener { @Getter(AccessLevel.PACKAGE) - protected Producers<T> outputProducer; + protected Producers outputProducer; @Override public void initializeOutputProducer(String outputTopic) throws Exception { - outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic, outputSerDe); + outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic); outputProducer.initialize(); } @Override - public void sendOutputMessage(MessageBuilder<T> outputMsgBuilder, RecordContext recordContext) + public void sendOutputMessage(MessageBuilder outputMsgBuilder, RecordContext recordContext) throws Exception { // assign sequence id to output message for idempotent producing @@ -139,9 +139,9 @@ public class PulsarSink<T> implements Sink<T> { .setSequenceId(recordContext.getRecordSequence()); // currently on PulsarRecord - Producer<T> producer = outputProducer.getProducer(recordContext.getPartitionId()); + Producer producer = outputProducer.getProducer(recordContext.getPartitionId()); - org.apache.pulsar.client.api.Message<T> outputMsg = outputMsgBuilder.build(); + org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build(); producer.sendAsync(outputMsg) .thenAccept(messageId -> recordContext.ack()) .join(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index c1dddb0..74f2366 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -19,41 +19,40 @@ package org.apache.pulsar.functions.source; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageListener; import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; -import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Utils; +import org.apache.pulsar.io.core.Record; +import org.apache.pulsar.io.core.Source; import org.jboss.util.Classes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + @Slf4j -public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> { +public class PulsarSource<T> implements Source<T> { private PulsarClient pulsarClient; private PulsarSourceConfig pulsarSourceConfig; + private Map<String, SerDe> topicToSerDeMap = new HashMap<>(); private boolean isTopicsPattern; - private Map<String, SerDe<T>> topicToSerDeMap = new HashMap<>(); - private Map<String, org.apache.pulsar.client.api.Consumer<T>> inputConsumers; + @Getter + private org.apache.pulsar.client.api.Consumer inputConsumer; public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) { this.pulsarClient = pulsarClient; @@ -65,31 +64,28 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> // Setup Serialization/Deserialization setupSerDe(); - inputConsumers = Maps.newHashMap(); - for (Map.Entry<String, SerDe<T>> entry : topicToSerDeMap.entrySet()) { - ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(entry.getValue()) + // Setup pulsar consumer + ConsumerBuilder<byte[]> consumerBuilder = this.pulsarClient.newConsumer() .subscriptionName(this.pulsarSourceConfig.getSubscriptionName()) - .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()) - .messageListener(this); - - if (pulsarSourceConfig.getTimeoutMs() != null) { - consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); - } - - if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) { - consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern()); - isTopicsPattern = true; - }else { - consumerBuilder.topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())); - } + .subscriptionType(this.pulsarSourceConfig.getSubscriptionType()); - inputConsumers.put(entry.getKey(),consumerBuilder.subscribe()); + if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) { + consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern()); + isTopicsPattern = true; + }else { + consumerBuilder.topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet())); } - + + if (pulsarSourceConfig.getTimeoutMs() != null) { + consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); + } + this.inputConsumer = consumerBuilder.subscribe(); } @Override - public void received(Consumer<T> consumer, Message<T> message) { + public Record<T> read() throws Exception { + org.apache.pulsar.client.api.Message<T> message = this.inputConsumer.receive(); + String topicName; String partitionId; @@ -131,44 +127,31 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> } PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) PulsarRecord.builder() - .value(input) - .messageId(message.getMessageId()) - .partitionId(String.format("%s-%s", topicName, partitionId)) - .recordSequence(Utils.getSequenceId(message.getMessageId())) - .topicName(topicName) - .ackFunction(() -> { - if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - consumer.acknowledgeCumulativeAsync(message); - } else { - consumer.acknowledgeAsync(message); - } - }).failFunction(() -> { - if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { - throw new RuntimeException("Failed to process message: " + message.getMessageId()); - } - }) - .build(); - - consume(pulsarMessage); - } - - public Consumer<T> getConsumerForTopic(String topic) { - return inputConsumers.get(topic); - } - - @Override - public void reachedEndOfTopic(Consumer<T> consumer) { - //No-op + .value(input) + .messageId(message.getMessageId()) + .partitionId(String.format("%s-%s", topicName, partitionId)) + .recordSequence(Utils.getSequenceId(message.getMessageId())) + .topicName(topicName) + .ackFunction(() -> { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + inputConsumer.acknowledgeCumulativeAsync(message); + } else { + inputConsumer.acknowledgeAsync(message); + } + }).failFunction(() -> { + if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + throw new RuntimeException("Failed to process message: " + message.getMessageId()); + } + }) + .build(); + return pulsarMessage; } @Override public void close() throws Exception { - inputConsumers.forEach((ignored, consumer) -> { - try { - consumer.close(); - } catch (PulsarClientException e) { - } - }); + if (this.inputConsumer != null) { + this.inputConsumer.close(); + } } @VisibleForTesting diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java index def0926..ce67442 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java @@ -19,8 +19,6 @@ package org.apache.pulsar.functions.instance.producers; import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -45,7 +43,6 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -199,10 +196,10 @@ public class MultiConsumersOneOutputTopicProducersTest { public void setup() throws Exception { this.mockClient = mock(PulsarClient.class); - when(mockClient.newProducer(any(Schema.class))) + when(mockClient.newProducer()) .thenReturn(new MockProducerBuilder()); - producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES); + producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC); producers.initialize(); } @@ -227,13 +224,13 @@ public class MultiConsumersOneOutputTopicProducersTest { assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(Schema.BYTES); + .newProducer(); assertTrue(producers.getProducers().containsKey(producerName)); // second get will not create a new producer assertSame(mockProducers.get(producerName), producer); verify(mockClient, times(1)) - .newProducer(Schema.BYTES); + .newProducer(); assertTrue(producers.getProducers().containsKey(producerName)); // close diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 8ca94bb..3c5e61b 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -34,11 +34,9 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import static java.util.Collections.emptyMap; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -80,10 +78,9 @@ public class PulsarSourceTest { doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString()); doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any()); doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any()); - doReturn(consumerBuilder).when(consumerBuilder).messageListener(anyObject()); Consumer consumer = mock(Consumer.class); doReturn(consumer).when(consumerBuilder).subscribe(); - doReturn(consumerBuilder).when(pulsarClient).newConsumer(anyObject()); + doReturn(consumerBuilder).when(pulsarClient).newConsumer(); return pulsarClient; } @@ -171,7 +168,7 @@ public class PulsarSourceTest { pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); - pulsarSource.open(emptyMap()); + pulsarSource.open(new HashMap<>()); } /**