[
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693651#comment-16693651
]
ASF GitHub Bot commented on NIFI-4914:
--------------------------------------
Github user david-streamlio commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3178#discussion_r235123261
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
---
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientService;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import
org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractPulsarProducerProcessor<T> extends
AbstractProcessor {
+
+ public static final String MSG_COUNT = "msg.count";
+ public static final String TOPIC_NAME = "topic.name";
+
+ static final AllowableValue COMPRESSION_TYPE_NONE = new
AllowableValue("NONE", "None", "No compression");
+ static final AllowableValue COMPRESSION_TYPE_LZ4 = new
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+ static final AllowableValue COMPRESSION_TYPE_ZLIB = new
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+ static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION =
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a
custom partition");
+ static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route
messages to all "
+
+ "partitions in a round robin
manner");
+ static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION =
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a
single partition");
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles for which all content was sent to
Pulsar.")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be sent to Pulsar will
be routed to this Relationship")
+ .build();
+
+ public static final PropertyDescriptor PULSAR_CLIENT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("PULSAR_CLIENT_SERVICE")
+ .displayName("Pulsar Client Service")
+ .description("Specified the Pulsar Client Service that can be
used to create Pulsar connections")
+ .required(true)
+ .identifiesControllerService(PulsarClientService.class)
+ .build();
+
+ public static final PropertyDescriptor TOPIC = new
PropertyDescriptor.Builder()
+ .name("TOPIC")
+ .displayName("Topic Name")
+ .description("The name of the Pulsar Topic.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ASYNC_ENABLED = new
PropertyDescriptor.Builder()
+ .name("ASYNC_ENABLED")
+ .displayName("Async Enabled")
+ .description("Control whether the messages will be sent
asyncronously or not. Messages sent"
+ + " syncronously will be acknowledged immediately
before processing the next message, while"
+ + " asyncronous messages will be acknowledged after
the Pulsar broker responds. Running the"
+ + " processor with async enabled will result in
increased the throughput at the risk of potential"
+ + " duplicate data being sent to the Pulsar broker.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor MAX_ASYNC_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("MAX_ASYNC_REQUESTS")
+ .displayName("Maximum Async Requests")
+ .description("The maximum number of outstanding asynchronous
publish requests for this processor. "
+ + "Each asynchronous call requires memory, so avoid
setting this value to high.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("50")
+ .build();
+
+ public static final PropertyDescriptor BATCHING_ENABLED = new
PropertyDescriptor.Builder()
+ .name("BATCHING_ENABLED")
+ .displayName("Batching Enabled")
+ .description("Control whether automatic batching of messages
is enabled for the producer. "
+ + "default: false [No batching] When batching is
enabled, multiple calls to "
+ + "Producer.sendAsync can result in a single batch to
be sent to the broker, leading "
+ + "to better throughput, especially when publishing
small messages. If compression is "
+ + "enabled, messages will be compressed at the batch
level, leading to a much better "
+ + "compression ratio for similar headers or contents.
When enabled default batch delay "
+ + "is set to 10 ms and default batch size is 1000
messages")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor BATCHING_MAX_MESSAGES = new
PropertyDescriptor.Builder()
+ .name("BATCHING_MAX_MESSAGES")
+ .displayName("Batching Max Messages")
+ .description("Set the maximum number of messages permitted in
a batch. default: "
+ + "1000 If set to a value greater than 1, messages
will be queued until this "
--- End diff --
All of the "batching" specified by these properties actually occurs within
the Pulsar client itself, and are not kept in NiFi anywhere. This property
merely exposes that setting to the end user if they wish to use that feature to
achieve better write throughput to Pulsar.
http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerBuilder.html#enableBatching-boolean-
> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord,
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
> Key: NIFI-4914
> URL: https://issues.apache.org/jira/browse/NIFI-4914
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: David Kjerrumgaard
> Priority: Minor
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)