[ 
https://issues.apache.org/jira/browse/NIFI-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15027284#comment-15027284
 ] 

ASF subversion and git services commented on NIFI-1192:
-------------------------------------------------------

Commit d949ee1a1ee529fbf0103b4620fef47832132a53 in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=d949ee1 ]

NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties set 
via UI and then maps those properties to the actual names used by Kafka, we can 
not rely on NiFi UI to display an error if user attempts to set a dynamic 
property which will eventually map to the same Kafka property. So, I’ve decided 
that any dynamic property will simply override an existing property with 
WARNING message displayed. It is actually consistent with how Kafka does it and 
displayed the overrides in the console. Updated the relevant annotation 
description.
It is also worth to mentioned that current code was using an old property from 
Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 
(WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - Property 
zk.connectiontimeout.ms is not valid). The add/override strategy would provide 
for more flexibility when dealing with Kafka volatile configuration until 
things will settle down and we can get some sensible defaults in place.

While doing it addressed the following issues that were discovered while making 
modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the 
onTrigger(..) method would block due to the fact that Kafka’s 
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would 
stops successfully due to the interrupt. However in UI it would appear as ERROR 
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let 
the task exit as quick as possible and that the whole thread maintenance logic 
was there initially due to the fact that there was no way to tell Kafka 
consumer to return immediately if there are no events. In this patch we are now 
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 
millisecond (default is -1 - always block infinitely). This ensures that tasks 
that attempted to read an empty topic will exit immediately just to be 
rescheduled by NiFi based on user configurations.

ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough 
to complete the batch since it would block waiting for more messages (based on 
the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s 
ConsumerTimeoutException which is handled in the catch block where the FlowFile 
with partial batch will be released to success. Not sure if we need to put a 
WARN message. In fact in my opinion we should not as it may create unnecessary 
confusion.

ISSUE: When configuring a consumer for topic and specifying multiple concurrent 
consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each 
consumer would bind to a topic partition. If you have less partitions then the 
value returned by 'context.getMaxConcurrentTasks()’ you would essentially 
allocate Kafka resources that would never get a chance to receive a single 
message  (see more here 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic 
and in the event where 'context.getMaxConcurrentTasks()’ value is greater than 
the amount of partitions, the partition count will be used to when creating 
‘topicCountMap’ and WARNING message will be displayed)see code). Unfortunately 
we can’t do anything with the actual tasks, but based on current state of the 
code they will exit immediately just to be rescheduled where the process will 
repeat. NOTE: That is not ideal as it will be rescheduling tasks that will 
never have a chance to do anything, but at least it could be fixed on the user 
side after reading the warning message.

NIFI-1192 added dynamic properties support for PutKafka

NIFI-1192 polishing

NIFI-1192 polished and addressed PR comments


> Rework Get/PutKafka for dynamic properties and improve performance under 
> certain conditions
> -------------------------------------------------------------------------------------------
>
>                 Key: NIFI-1192
>                 URL: https://issues.apache.org/jira/browse/NIFI-1192
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.3.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>            Priority: Critical
>             Fix For: 0.4.0
>
>
> Currently Kafka does not honor dynamic properties which means aside from 8 
> properties exposed none others could be set



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to