Github user david-streamlio commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2702#discussion_r197578241
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public abstract class AbstractPulsarProducerProcessor<T> extends
AbstractPulsarProcessor {
+
+ public static final String MSG_COUNT = "msg.count";
+ public static final String TOPIC_NAME = "topic.name";
+
+ static final AllowableValue COMPRESSION_TYPE_NONE = new
AllowableValue("NONE", "None", "No compression");
+ static final AllowableValue COMPRESSION_TYPE_LZ4 = new
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+ static final AllowableValue COMPRESSION_TYPE_ZLIB = new
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+ static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION =
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a
custom partition");
+ static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route
messages to all "
+
+ "partitions in a round robin
manner");
+ static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION =
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a
single partition");
+
+ public static final PropertyDescriptor TOPIC = new
PropertyDescriptor.Builder()
+ .name("topic")
+ .displayName("Topic Name")
+ .description("The name of the Pulsar Topic.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ASYNC_ENABLED = new
PropertyDescriptor.Builder()
+ .name("Async Enabled")
+ .description("Control whether the messages will be sent
asyncronously or not. Messages sent"
+ + " syncronously will be acknowledged immediately
before processing the next message, while"
+ + " asyncronous messages will be acknowledged after
the Pulsar broker responds.")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum Async Requests")
+ .description("The maximum number of outstanding asynchronous
publish requests for this processor. "
+ + "Each asynchronous call requires memory, so avoid
setting this value to high.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("50")
+ .build();
+
+ public static final PropertyDescriptor BATCHING_ENABLED = new
PropertyDescriptor.Builder()
+ .name("Batching Enabled")
+ .description("Control whether automatic batching of messages
is enabled for the producer. "
+ + "default: false [No batching] When batching is
enabled, multiple calls to "
+ + "Producer.sendAsync can result in a single batch to
be sent to the broker, leading "
+ + "to better throughput, especially when publishing
small messages. If compression is "
+ + "enabled, messages will be compressed at the batch
level, leading to a much better "
+ + "compression ratio for similar headers or contents.
When enabled default batch delay "
+ + "is set to 10 ms and default batch size is 1000
messages")
+ .required(false)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new
PropertyDescriptor.Builder()
+ .name("Batching Max Messages")
+ .description("Set the maximum number of messages permitted in
a batch. default: "
+ + "1000 If set to a value greater than 1, messages
will be queued until this "
+ + "threshold is reached or batch interval has elapsed")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ public static final PropertyDescriptor BATCH_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("Batch Interval")
+ .description("Set the time period within which the messages
sent will be batched default: 10ms "
+ + "if batch messages are enabled. If set to a non zero
value, messages will be queued until "
+ + "this time interval or until the Batching Max
Messages threshould has been reached")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .defaultValue("10")
+ .build();
+
+ public static final PropertyDescriptor BLOCK_IF_QUEUE_FULL = new
PropertyDescriptor.Builder()
+ .name("Block if Message Queue Full")
+ .description("Set whether the processor should block when the
outgoing message queue is full. "
+ + "Default is false. If set to false, send operations
will immediately fail with "
+ + "ProducerQueueIsFullError when there is no space
left in pending queue.")
+ .required(false)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor COMPRESSION_TYPE = new
PropertyDescriptor.Builder()
+ .name("Compression Type")
+ .description("Set the compression type for the producer.")
+ .required(false)
+ .allowableValues(COMPRESSION_TYPE_NONE, COMPRESSION_TYPE_LZ4,
COMPRESSION_TYPE_ZLIB)
+ .defaultValue(COMPRESSION_TYPE_NONE.getValue())
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_ROUTING_MODE = new
PropertyDescriptor.Builder()
+ .name("Message Routing Mode")
+ .description("Set the message routing mode for the producer.
This applies only if the destination topic is partitioned")
+ .required(false)
+ .allowableValues(MESSAGE_ROUTING_MODE_CUSTOM_PARTITION,
MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION,
MESSAGE_ROUTING_MODE_SINGLE_PARTITION)
+
.defaultValue(MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION.getValue())
+ .build();
+
+ public static final PropertyDescriptor PENDING_MAX_MESSAGES = new
PropertyDescriptor.Builder()
+ .name("Max Pending Messages")
+ .description("Set the max size of the queue holding the
messages pending to receive an "
+ + "acknowledgment from the broker.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ protected static final List<PropertyDescriptor> PROPERTIES;
+ protected static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(PULSAR_CLIENT_SERVICE);
+ properties.add(TOPIC);
+ properties.add(ASYNC_ENABLED);
+ properties.add(MAX_ASYNC_REQUESTS);
+ properties.add(BATCHING_ENABLED);
+ properties.add(BATCHING_MAX_MESSAGES);
+ properties.add(BATCH_INTERVAL);
+ properties.add(BLOCK_IF_QUEUE_FULL);
+ properties.add(COMPRESSION_TYPE);
+ properties.add(MESSAGE_ROUTING_MODE);
+ properties.add(PENDING_MAX_MESSAGES);
+ PROPERTIES = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ protected LRUCache<String, Producer<T>> producers;
+
+ // Pool for running multiple publish Async requests
+ protected ExecutorService publisherPool;
+ protected ExecutorCompletionService<Object> publisherService;
+
+ @OnScheduled
+ public void init(ProcessContext context) {
+ if (context.getProperty(ASYNC_ENABLED).isSet() &&
context.getProperty(ASYNC_ENABLED).asBoolean()) {
+ publisherPool =
Executors.newFixedThreadPool(context.getProperty(MAX_ASYNC_REQUESTS).asInteger());
+ publisherService = new
ExecutorCompletionService<>(publisherPool);
+ }
+ }
+
+ @OnUnscheduled
+ public void shutDown(final ProcessContext context) {
+ if (context.getProperty(ASYNC_ENABLED).isSet() &&
context.getProperty(ASYNC_ENABLED).asBoolean()) {
+ // Stop all the async publishers
+ try {
+ publisherPool.shutdown();
+ publisherPool.awaitTermination(20, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ getLogger().error("Unable to stop all the Pulsar Producers",
e);
+ }
+ }
+ }
+
+ @OnStopped
+ public void cleanUp(final ProcessContext context) {
+ getProducerCache(context).clear();
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected void sendAsync(Producer producer, final ProcessSession
session, final byte[] messageContent) {
+ try {
+ publisherService.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ try {
+ return
producer.newMessage().value(messageContent).sendAsync().handle((msgId, ex) -> {
+ if (msgId != null) {
+ return msgId;
+ } else {
+ FlowFile flowFile = session.create();
--- End diff --
When I tried to use the original FlowFile, I would get runtime exceptions
about the FlowFile NOT being successfully transferred to a relationship. Since
this code is inside an async callback method, by the time it is invoked the
original session has been closed. Therefore, my only option to transfer out the
failed messages is to create a new FlowFile specifically for failures.
---