GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/131

    NIFI-1192 added support for dynamic properties to GetKafka

    Due to the fact that current component uses artificial names for properties 
set via UI and then maps those properties to the actual names used by Kafka, we 
can not rely on NiFi UI to display an error if user attempts to set a dynamic 
property which will eventually map to the same Kafka property. So, I’ve 
decided that any dynamic property will simply override an existing property 
with WARNING message displayed. It is actually consistent with how Kafka does 
it and displayed the overrides in the console. Updated the relevant annotation 
description.
    It is also worth to mentioned that current code was using an old property 
from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in 
Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - 
Property zk.connectiontimeout.ms is not valid). The add/override strategy would 
provide for more flexibility when dealing with Kafka volatile configuration 
until things will settle down and we can get some sensible defaults in place.
    
    While doing it addressed the following issues that were discovered while 
making modification and testing:
    ISSUE: When GetKafka started and there are no messages in Kafka topic the 
onTrigger(..) method would block due to the fact that Kafka’s 
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would 
stops successfully due to the interrupt. However in UI it would appear as ERROR 
based on the fact that InterruptException was not handled.
    RESOLUTION: After discussing it with @markap14 the the general desire is to 
let the task exit as quick as possible and that the whole thread maintenance 
logic was there initially due to the fact that there was no way to tell Kafka 
consumer to return immediately if there are no events. In this patch we are now 
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 
millisecond (default is -1 - always block infinitely). This ensures that tasks 
that attempted to read an empty topic will exit immediately just to be 
rescheduled by NiFi based on user configurations.
    
    ISSUE:  Kafka would not release FlowFile with events if it didn’t have 
enough to complete the batch since it would block waiting for more messages 
(based on the blocking issue described above).
    RESOLUTION: The invocation of hasNext() results in Kafka’s 
ConsumerTimeoutException which is handled in the catch block where the FlowFile 
with partial batch will be released to success. Not sure if we need to put a 
WARN message. In fact in my opinion we should not as it may create unnecessary 
confusion.
    
    ISSUE: When configuring a consumer for topic and specifying multiple 
concurrent consumers in ‘topicCountMap’ based on 
'context.getMaxConcurrentTasks()’ each consumer would bind to a topic 
partition. If you have less partitions then the value returned by 
'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka 
resources that would never get a chance to receive a single message  (see more 
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
    RESOLUTION: Logic was added to determine the amount of partitions for a 
topic and in the event where 'context.getMaxConcurrentTasks()’ value is 
greater than the amount of partitions, the partition count will be used to when 
creating ‘topicCountMap’ and WARNING message will be displayed)see code). 
Unfortunately we can’t do anything with the actual tasks, but based on 
current state of the code they will exit immediately just to be rescheduled 
where the process will repeat. NOTE: That is not ideal as it will be 
rescheduling tasks that will never have a chance to do anything, but at least 
it could be fixed on the user side after reading the warning message.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-1192B

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #131
    
----
commit f9d1b2811a08c372baf660a8a5e8d0f73e1a23a2
Author: Oleg Zhurakousky <[email protected]>
Date:   2015-11-23T20:15:03Z

    NIFI-1192 added support for dynamic properties to GetKafka
    Due to the fact that current component uses artificial names for properties 
set via UI and then maps those properties to the actual names used by Kafka, we 
can not rely on NiFi UI to display an error if user attempts to set a dynamic 
property which will eventually map to the same Kafka property. So, I’ve 
decided that any dynamic property will simply override an existing property 
with WARNING message displayed. It is actually consistent with how Kafka does 
it and displayed the overrides in the console. Updated the relevant annotation 
description.
    It is also worth to mentioned that current code was using an old property 
from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in 
Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - 
Property zk.connectiontimeout.ms is not valid). The add/override strategy would 
provide for more flexibility when dealing with Kafka volatile configuration 
until things will settle down and we can get some sensible defaults in place.
    
    While doing it addressed the following issues that were discovered while 
making modification and testing:
    ISSUE: When GetKafka started and there are no messages in Kafka topic the 
onTrigger(..) method would block due to the fact that Kafka’s 
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would 
stops successfully due to the interrupt. However in UI it would appear as ERROR 
based on the fact that InterruptException was not handled.
    RESOLUTION: After discussing it with @markap14 the the general desire is to 
let the task exit as quick as possible and that the whole thread maintenance 
logic was there initially due to the fact that there was no way to tell Kafka 
consumer to return immediately if there are no events. In this patch we are now 
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 
millisecond (default is -1 - always block infinitely). This ensures that tasks 
that attempted to read an empty topic will exit immediately just to be 
rescheduled by NiFi based on user configurations.
    
    ISSUE:  Kafka would not release FlowFile with events if it didn’t have 
enough to complete the batch since it would block waiting for more messages 
(based on the blocking issue described above).
    RESOLUTION: The invocation of hasNext() results in Kafka’s 
ConsumerTimeoutException which is handled in the catch block where the FlowFile 
with partial batch will be released to success. Not sure if we need to put a 
WARN message. In fact in my opinion we should not as it may create unnecessary 
confusion.
    
    ISSUE: When configuring a consumer for topic and specifying multiple 
concurrent consumers in ‘topicCountMap’ based on 
'context.getMaxConcurrentTasks()’ each consumer would bind to a topic 
partition. If you have less partitions then the value returned by 
'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka 
resources that would never get a chance to receive a single message  (see more 
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
    RESOLUTION: Logic was added to determine the amount of partitions for a 
topic and in the event where 'context.getMaxConcurrentTasks()’ value is 
greater than the amount of partitions, the partition count will be used to when 
creating ‘topicCountMap’ and WARNING message will be displayed)see code). 
Unfortunately we can’t do anything with the actual tasks, but based on 
current state of the code they will exit immediately just to be rescheduled 
where the process will repeat. NOTE: That is not ideal as it will be 
rescheduling tasks that will never have a chance to do anything, but at least 
it could be fixed on the user side after reading the warning message.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to