Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2702#discussion_r197786403
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java
---
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+public abstract class AbstractPulsarConsumerProcessor<T> extends
AbstractPulsarProcessor {
+
+ static final AllowableValue EXCLUSIVE = new
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the
same topic with the same subscription name");
+ static final AllowableValue SHARED = new AllowableValue("Shared",
"Shared", "Multiple consumer will be able to use the same subscription name and
the messages");
+ static final AllowableValue FAILOVER = new AllowableValue("Failover",
"Failover", "Multiple consumer will be able to use the same subscription name
but only 1 consumer "
+ + "will receive the messages. If that consumer
disconnects, one of the other connected consumers will start receiving
messages");
+
+ static final AllowableValue CONSUME = new
AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume",
+ "Mark the message as consumed despite being unable to decrypt
the contents");
+ static final AllowableValue DISCARD = new
AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard",
+ "Discard the message and don't perform any addtional
processing on the message");
+ static final AllowableValue FAIL = new
AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail",
+ "Report a failure condition, and the route the message
contents to the FAILED relationship.");
+
+ public static final PropertyDescriptor TOPICS = new
PropertyDescriptor.Builder()
+ .name("topics")
+ .displayName("Topic Names")
+ .description("Specify the topics this consumer will subscribe
on. "
+ + "You can specify multiple topics in a
comma-separated list."
+ + "E.g topicA, topicB, topicC ")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TOPICS_PATTERN = new
PropertyDescriptor.Builder()
+ .name("Topics Pattern")
+ .description("Specify a pattern for topics that this consumer
will subscribe on. "
+ + "It accepts regular expression and will be compiled
into a pattern internally. "
+ + "E.g.
\"persistent://prop/use/ns-abc/pattern-topic-.*\"")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SUBSCRIPTION_NAME = new
PropertyDescriptor.Builder()
+ .name("Subscription Name")
+ .description("Specify the subscription name for this
consumer.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ASYNC_ENABLED = new
PropertyDescriptor.Builder()
+ .name("Async Enabled")
+ .description("Control whether the messages will be consumed
asyncronously or not. Messages consumed"
+ + " syncronously will be acknowledged immediately
before processing the next message, while"
+ + " asyncronous messages will be acknowledged after
the Pulsar broker responds.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum Async Requests")
+ .description("The maximum number of outstanding asynchronous
consumer requests for this processor. "
+ + "Each asynchronous call requires memory, so avoid
setting this value to high.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("50")
+ .build();
+
+ public static final PropertyDescriptor ACK_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Acknowledgment Timeout")
+ .description("Set the timeout (in milliseconds) for unacked
messages, truncated to the "
+ + "nearest millisecond. A value of 0 means there is no
timeout. If a non-zero value "
+ + "is sepcified, then messages that are not
acknowledged within the configured"
+ + " timeout will be replayed.")
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CONSUMER_NAME = new
PropertyDescriptor.Builder()
+ .name("Consumer Name")
+ .description("Set the name of the consumer to uniquely
identify this client on the Broker")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PRIORITY_LEVEL = new
PropertyDescriptor.Builder()
+ .name("Consumer Priority Level")
+ .description("Sets priority level for the shared subscription
consumers to which broker "
+ + "gives more priority while dispatching messages.
Here, broker follows descending "
+ + "priorities. (eg: 0=max-priority, 1, 2,..) ")
+ .required(false)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("5")
+ .build();
+
+ public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Consumer receive queue size.")
+ .description("The consumer receive queue controls how many
messages can be accumulated "
+ + "by the Consumer before the application calls
Consumer.receive(). Using a higher "
+ + "value could potentially increase the consumer
throughput at the expense of bigger "
+ + "memory utilization. \n"
+ + "Setting the consumer queue size as zero, \n"
+ + "\t - Decreases the throughput of the consumer, by
disabling pre-fetching of messages. \n"
+ + "\t - Doesn't support Batch-Message: if consumer
receives any batch-message then it closes consumer "
+ + "connection with broker and consumer will not be
able receive any further message unless batch-message "
+ + "in pipeline is removed")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ public static final PropertyDescriptor SUBSCRIPTION_TYPE = new
PropertyDescriptor.Builder()
+ .name("Subscription Type")
+ .description("Select the subscription type to be used when
subscribing to the topic.")
+ .required(false)
+ .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
+ .defaultValue(SHARED.getValue())
+ .build();
+
+ protected static final List<PropertyDescriptor> PROPERTIES;
+ protected static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PULSAR_CLIENT_SERVICE);
+ properties.add(TOPICS);
+ properties.add(TOPICS_PATTERN);
+ properties.add(SUBSCRIPTION_NAME);
+ properties.add(CONSUMER_NAME);
+ properties.add(ASYNC_ENABLED);
+ properties.add(MAX_ASYNC_REQUESTS);
+ properties.add(ACK_TIMEOUT);
+ properties.add(PRIORITY_LEVEL);
+ properties.add(RECEIVER_QUEUE_SIZE);
+ properties.add(SUBSCRIPTION_TYPE);
+
+ PROPERTIES = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ protected Consumer<T> consumer;
+ protected ExecutorService consumerPool;
+ protected ExecutorCompletionService<Message<T>> consumerService;
+ protected ExecutorService ackPool;
+ protected ExecutorCompletionService<Object> ackService;
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @OnScheduled
+ public void init(ProcessContext context) {
+ if (context.getProperty(ASYNC_ENABLED).isSet() &&
context.getProperty(ASYNC_ENABLED).asBoolean()) {
+ consumerPool =
Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
+ consumerService = new ExecutorCompletionService<>(consumerPool);
+ ackPool =
Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger()
+ 1);
+ ackService = new ExecutorCompletionService<>(ackPool);
+ }
+ }
+
+ @OnUnscheduled
+ public void shutDown(final ProcessContext context) {
+ if (context.getProperty(ASYNC_ENABLED).isSet() &&
context.getProperty(ASYNC_ENABLED).asBoolean()) {
+ try {
+ consumerPool.shutdown();
+ consumerPool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ getLogger().error("Unable to stop all the Pulsar
Consumers", e);
+ }
+
+ try {
+ ackPool.shutdown();
+ ackPool.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ getLogger().error("Unable to wait for all of the message
acknowledgments to be sent", e);
+ }
+ }
+ close(context);
+ }
+
+ @OnStopped
+ public void close(final ProcessContext context) {
+ try {
+ getLogger().info("Disconnecting Pulsar Consumer");
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (Exception e) {
+ getLogger().error("Unable to close Pulsar consumer", e);
+ } finally {
+ consumer = null;
+ }
+ }
+
+ protected void consumeAsync(ProcessContext context, ProcessSession
session) throws PulsarClientException {
+ Consumer<T> consumer = getConsumer(context);
+
+ try {
+ consumerService.submit(() -> {
+ return consumer.receive();
+ });
+ } catch (final RejectedExecutionException ex) {
+ getLogger().error("Unable to consume aany more Pulsar
messages", ex);
+ }
+ }
+
+ protected Consumer<T> getConsumer(ProcessContext context) throws
PulsarClientException {
+
+ if (consumer == null) {
+ ConsumerBuilder<T> builder = (ConsumerBuilder<T>)
getPulsarClient(context).newConsumer();
+
+ if (context.getProperty(TOPICS).isSet()) {
+ builder = builder.topic(context.getProperty(TOPICS)
+
.evaluateAttributeExpressions().getValue().split("[, ]"));
--- End diff --
@david-streamlio yes, that is true. However, it does not allow for commas
separating with spaces. I.e., 90% of the time users will enter "myTopic,
yourTopic" and that regex will split it into "myTopic" followed by "
yourTopic".... note the space before the 'y' in the second topic name. It is
best to trim these out. Otherwise, users end up getting back error messages
like "Could not find topic yourTopic" and then it's not clear that there's a
leading space and a lot of time is spent trying to debug.
---