[
https://issues.apache.org/jira/browse/NIFI-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14592062#comment-14592062
]
ASF GitHub Bot commented on NIFI-413:
-------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/incubator-nifi/pull/55#discussion_r32749706
--- Diff:
nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
---
@@ -136,6 +136,68 @@
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
+ public static final PropertyDescriptor PRODUCER_TYPE = new
PropertyDescriptor.Builder()
+ .name("Producer Type")
+ .description("This parameter specifies whether the messages
are sent asynchronously in a background thread."
+ + " Valid values are (1) async for asynchronous send
and (2) sync for synchronous send."
+ + " By setting the producer to async we allow batching
together of requests (which is great for throughput)"
+ + " but open the possibility of a failure of the
client machine dropping unsent data.")
+ .required(true)
+ .allowableValues("sync", "async")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("sync")
+ .build();
+ public static final PropertyDescriptor BATCH_NUM_MESSAGES = new
PropertyDescriptor.Builder()
+ .name("Async Message Batch Size (batch.num.messages)")
+ .description("Used only if Producer Type is set to \"async\".
The number of messages to send in one batch when using async mode."
+ + " The producer will wait until either this number of
messages are ready"
+ + " to send or queue.buffer.max.ms is reached.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("200").build();
+ public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS =
new PropertyDescriptor.Builder()
+ .name("Queue Buffering Max Time (queue.buffering.max.ms)")
+ .description("Used only if Producer Type is set to \"async\".
Maximum time to buffer data when using async mode. For example a setting of 100"
+ + " will try to batch together 100ms of messages to
send at once. This will improve"
+ + " throughput but adds message delivery latency due
to the buffering.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("5000").build();
+ public static final PropertyDescriptor
QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
+ .name("Queue Buffer Max Count (queue.buffering.max.messages)")
+ .description("Used only if Producer Type is set to \"async\".
The maximum number of unsent messages that can be queued up the producer when"
+ + " using async mode before either the producer must
be blocked or data must be dropped.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("10000").build();
+ public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS =
new PropertyDescriptor.Builder()
+ .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
+ .description("Used only if Producer Type is set to \"async\".
The amount of time to block before dropping messages when running in async mode"
+ + " and the buffer has reached
queue.buffering.max.messages. If set to 0 events will"
+ + " be enqueued immediately or dropped if the queue is
full (the producer send call will"
+ + " never block). If set to -1 the producer will block
indefinitely and never willingly"
+ + " drop a send.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("-1").build();
+ public static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
+ .name("Compression Codec (compression.codec)")
+ .description("This parameter allows you to specify the
compression codec for all"
+ + " data generated by this producer. Valid values are
\"none\", \"gzip\" and \"snappy\".")
--- End diff --
Do not recommend calling out valid values in the description, since they
are provided already via .allowableValues(). Standard convention is also to
capitalize the first letters (this is not always adhered to, but we should
clean that up)
> KafkaPut should support compression option
> ------------------------------------------
>
> Key: NIFI-413
> URL: https://issues.apache.org/jira/browse/NIFI-413
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Michael Braun
> Priority: Minor
> Labels: beginner, newbie
> Fix For: 0.2.0
>
>
> Kafka supports compression out of the box (GZIP, Snappy with another
> dependency) but the KafkaPut operator does not expose the option. It would be
> nice if this was exposed in processor settings so processing could benefit
> from compression.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)