[
https://issues.apache.org/jira/browse/NIFI-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15027283#comment-15027283
]
ASF subversion and git services commented on NIFI-1192:
-------------------------------------------------------
Commit d949ee1a1ee529fbf0103b4620fef47832132a53 in nifi's branch
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=d949ee1 ]
NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties set
via UI and then maps those properties to the actual names used by Kafka, we can
not rely on NiFi UI to display an error if user attempts to set a dynamic
property which will eventually map to the same Kafka property. So, I’ve decided
that any dynamic property will simply override an existing property with
WARNING message displayed. It is actually consistent with how Kafka does it and
displayed the overrides in the console. Updated the relevant annotation
description.
It is also worth to mentioned that current code was using an old property from
Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8
(WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - Property
zk.connectiontimeout.ms is not valid). The add/override strategy would provide
for more flexibility when dealing with Kafka volatile configuration until
things will settle down and we can get some sensible defaults in place.
While doing it addressed the following issues that were discovered while making
modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the
onTrigger(..) method would block due to the fact that Kafka’s
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would
stops successfully due to the interrupt. However in UI it would appear as ERROR
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let
the task exit as quick as possible and that the whole thread maintenance logic
was there initially due to the fact that there was no way to tell Kafka
consumer to return immediately if there are no events. In this patch we are now
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1
millisecond (default is -1 - always block infinitely). This ensures that tasks
that attempted to read an empty topic will exit immediately just to be
rescheduled by NiFi based on user configurations.
ISSUE: Kafka would not release FlowFile with events if it didn’t have enough
to complete the batch since it would block waiting for more messages (based on
the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s
ConsumerTimeoutException which is handled in the catch block where the FlowFile
with partial batch will be released to success. Not sure if we need to put a
WARN message. In fact in my opinion we should not as it may create unnecessary
confusion.
ISSUE: When configuring a consumer for topic and specifying multiple concurrent
consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each
consumer would bind to a topic partition. If you have less partitions then the
value returned by 'context.getMaxConcurrentTasks()’ you would essentially
allocate Kafka resources that would never get a chance to receive a single
message (see more here
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic
and in the event where 'context.getMaxConcurrentTasks()’ value is greater than
the amount of partitions, the partition count will be used to when creating
‘topicCountMap’ and WARNING message will be displayed)see code). Unfortunately
we can’t do anything with the actual tasks, but based on current state of the
code they will exit immediately just to be rescheduled where the process will
repeat. NOTE: That is not ideal as it will be rescheduling tasks that will
never have a chance to do anything, but at least it could be fixed on the user
side after reading the warning message.
NIFI-1192 added dynamic properties support for PutKafka
NIFI-1192 polishing
NIFI-1192 polished and addressed PR comments
> Rework Get/PutKafka for dynamic properties and improve performance under
> certain conditions
> -------------------------------------------------------------------------------------------
>
> Key: NIFI-1192
> URL: https://issues.apache.org/jira/browse/NIFI-1192
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 0.3.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Priority: Critical
> Fix For: 0.4.0
>
>
> Currently Kafka does not honor dynamic properties which means aside from 8
> properties exposed none others could be set
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)