Github user david-streamlio commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2702#discussion_r197508790
  
    --- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java
 ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.    See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.    You may obtain a copy of the License at
    + *
    + *         http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.pulsar;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ExecutorCompletionService;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.pulsar.client.api.Consumer;
    +import org.apache.pulsar.client.api.ConsumerBuilder;
    +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
    +import org.apache.pulsar.client.api.Message;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import org.apache.pulsar.client.api.SubscriptionType;
    +
    +public abstract class AbstractPulsarConsumerProcessor<T> extends 
AbstractPulsarProcessor {
    +
    +    static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
    +    static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
    +    static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
    +                    + "will receive the messages. If that consumer 
disconnects, one of the other connected consumers will start receiving 
messages");
    +
    +    static final AllowableValue CONSUME = new 
AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume",
    +            "Mark the message as consumed despite being unable to decrypt 
the contents");
    +    static final AllowableValue DISCARD = new 
AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard",
    +            "Discard the message and don't perform any addtional 
processing on the message");
    +    static final AllowableValue FAIL = new 
AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail",
    +            "Report a failure condition, and the route the message 
contents to the FAILED relationship.");
    +
    +    public static final PropertyDescriptor TOPICS = new 
PropertyDescriptor.Builder()
    +            .name("topics")
    +            .displayName("Topic Names")
    +            .description("Specify the topics this consumer will subscribe 
on. "
    +                    + "You can specify multiple topics in a 
comma-separated list."
    +                    + "E.g topicA, topicB, topicC ")
    +            .required(false)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TOPICS_PATTERN = new 
PropertyDescriptor.Builder()
    +            .name("Topics Pattern")
    +            .description("Specify a pattern for topics that this consumer 
will subscribe on. "
    +                    + "It accepts regular expression and will be compiled 
into a pattern internally. "
    +                    + "E.g. 
\"persistent://prop/use/ns-abc/pattern-topic-.*\"")
    +            .required(false)
    +            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Subscription Name")
    +            .description("Specify the subscription name for this 
consumer.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor ASYNC_ENABLED = new 
PropertyDescriptor.Builder()
    +            .name("Async Enabled")
    +            .description("Control whether the messages will be consumed 
asyncronously or not. Messages consumed"
    +                    + " syncronously will be acknowledged immediately 
before processing the next message, while"
    +                    + " asyncronous messages will be acknowledged after 
the Pulsar broker responds.")
    +            .required(true)
    +            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new 
PropertyDescriptor.Builder()
    +            .name("Maximum Async Requests")
    +            .description("The maximum number of outstanding asynchronous 
consumer requests for this processor. "
    +                    + "Each asynchronous call requires memory, so avoid 
setting this value to high.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor ACK_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Acknowledgment Timeout")
    +            .description("Set the timeout (in milliseconds) for unacked 
messages, truncated to the "
    +                    + "nearest millisecond. A value of 0 means there is no 
timeout. If a non-zero value "
    +                    + "is sepcified, then messages that are not 
acknowledged within the configured"
    +                    + " timeout will be replayed.")
    +            .required(false)
    +            .build();
    +
    +    public static final PropertyDescriptor CONSUMER_NAME = new 
PropertyDescriptor.Builder()
    +            .name("Consumer Name")
    +            .description("Set the name of the consumer to uniquely 
identify this client on the Broker")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PRIORITY_LEVEL = new 
PropertyDescriptor.Builder()
    +            .name("Consumer Priority Level")
    +            .description("Sets priority level for the shared subscription 
consumers to which broker "
    +                    + "gives more priority while dispatching messages. 
Here, broker follows descending "
    +                    + "priorities. (eg: 0=max-priority, 1, 2,..) ")
    +            .required(false)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .build();
    +
    +    public static final PropertyDescriptor RECEIVER_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Consumer receive queue size.")
    +            .description("The consumer receive queue controls how many 
messages can be accumulated "
    +                    + "by the Consumer before the application calls 
Consumer.receive(). Using a higher "
    +                    + "value could potentially increase the consumer 
throughput at the expense of bigger "
    +                    + "memory utilization. \n"
    +                    + "Setting the consumer queue size as zero, \n"
    +                    + "\t - Decreases the throughput of the consumer, by 
disabling pre-fetching of messages. \n"
    +                    + "\t - Doesn't support Batch-Message: if consumer 
receives any batch-message then it closes consumer "
    +                    + "connection with broker and consumer will not be 
able receive any further message unless batch-message "
    +                    + "in pipeline is removed")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("1000")
    +            .build();
    +
    +    public static final PropertyDescriptor SUBSCRIPTION_TYPE = new 
PropertyDescriptor.Builder()
    +            .name("Subscription Type")
    +            .description("Select the subscription type to be used when 
subscribing to the topic.")
    +            .required(false)
    +            .allowableValues(EXCLUSIVE, SHARED, FAILOVER)
    +            .defaultValue(SHARED.getValue())
    +            .build();
    +
    +    protected static final List<PropertyDescriptor> PROPERTIES;
    +    protected static final Set<Relationship> RELATIONSHIPS;
    +
    +    static {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(PULSAR_CLIENT_SERVICE);
    +        properties.add(TOPICS);
    +        properties.add(TOPICS_PATTERN);
    +        properties.add(SUBSCRIPTION_NAME);
    +        properties.add(CONSUMER_NAME);
    +        properties.add(ASYNC_ENABLED);
    +        properties.add(MAX_ASYNC_REQUESTS);
    +        properties.add(ACK_TIMEOUT);
    +        properties.add(PRIORITY_LEVEL);
    +        properties.add(RECEIVER_QUEUE_SIZE);
    +        properties.add(SUBSCRIPTION_TYPE);
    +
    +        PROPERTIES = Collections.unmodifiableList(properties);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    protected Consumer<T> consumer;
    +    protected ExecutorService consumerPool;
    +    protected ExecutorCompletionService<Message<T>> consumerService;
    +    protected ExecutorService ackPool;
    +    protected ExecutorCompletionService<Object> ackService;
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void init(ProcessContext context) {
    +       if (context.getProperty(ASYNC_ENABLED).isSet() && 
context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +           consumerPool = 
Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
    +           consumerService = new ExecutorCompletionService<>(consumerPool);
    +           ackPool = 
Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger()
 + 1);
    +           ackService = new ExecutorCompletionService<>(ackPool);
    +       }
    +    }
    +
    +    @OnUnscheduled
    +    public void shutDown(final ProcessContext context) {
    +        if (context.getProperty(ASYNC_ENABLED).isSet() && 
context.getProperty(ASYNC_ENABLED).asBoolean()) {
    +            try {
    +                consumerPool.shutdown();
    +                consumerPool.awaitTermination(10, TimeUnit.SECONDS);
    --- End diff --
    
    I added the wait after some of my unit tests showed that not all of the 
messages consumed where being acknowledged. Adding the delay provides some 
additional opportunity for the acknowledgements to occur.  By ensuring that the 
messages are properly acked, we prevent re-processing the same messages in the 
event of a shutdown and restart of the processor since the un-acked messages 
would be replayed on startup.


---

Reply via email to