GitHub user olegz opened a pull request:
https://github.com/apache/nifi/pull/131
NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties
set via UI and then maps those properties to the actual names used by Kafka, we
can not rely on NiFi UI to display an error if user attempts to set a dynamic
property which will eventually map to the same Kafka property. So, Iâve
decided that any dynamic property will simply override an existing property
with WARNING message displayed. It is actually consistent with how Kafka does
it and displayed the overrides in the console. Updated the relevant annotation
description.
It is also worth to mentioned that current code was using an old property
from Kafka 0.7 (âzk.connectiontimeout.msâ) which is no longer present in
Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 -
Property zk.connectiontimeout.ms is not valid). The add/override strategy would
provide for more flexibility when dealing with Kafka volatile configuration
until things will settle down and we can get some sensible defaults in place.
While doing it addressed the following issues that were discovered while
making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the
onTrigger(..) method would block due to the fact that Kafkaâs
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would
stops successfully due to the interrupt. However in UI it would appear as ERROR
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to
let the task exit as quick as possible and that the whole thread maintenance
logic was there initially due to the fact that there was no way to tell Kafka
consumer to return immediately if there are no events. In this patch we are now
using âconsumer.timeout.msâ property of Kafka and setting its value to 1
millisecond (default is -1 - always block infinitely). This ensures that tasks
that attempted to read an empty topic will exit immediately just to be
rescheduled by NiFi based on user configurations.
ISSUE: Kafka would not release FlowFile with events if it didnât have
enough to complete the batch since it would block waiting for more messages
(based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafkaâs
ConsumerTimeoutException which is handled in the catch block where the FlowFile
with partial batch will be released to success. Not sure if we need to put a
WARN message. In fact in my opinion we should not as it may create unnecessary
confusion.
ISSUE: When configuring a consumer for topic and specifying multiple
concurrent consumers in âtopicCountMapâ based on
'context.getMaxConcurrentTasks()â each consumer would bind to a topic
partition. If you have less partitions then the value returned by
'context.getMaxConcurrentTasks()â you would essentially allocate Kafka
resources that would never get a chance to receive a single message (see more
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a
topic and in the event where 'context.getMaxConcurrentTasks()â value is
greater than the amount of partitions, the partition count will be used to when
creating âtopicCountMapâ and WARNING message will be displayed)see code).
Unfortunately we canât do anything with the actual tasks, but based on
current state of the code they will exit immediately just to be rescheduled
where the process will repeat. NOTE: That is not ideal as it will be
rescheduling tasks that will never have a chance to do anything, but at least
it could be fixed on the user side after reading the warning message.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/olegz/nifi NIFI-1192B
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/131.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #131
----
commit f9d1b2811a08c372baf660a8a5e8d0f73e1a23a2
Author: Oleg Zhurakousky <[email protected]>
Date: 2015-11-23T20:15:03Z
NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties
set via UI and then maps those properties to the actual names used by Kafka, we
can not rely on NiFi UI to display an error if user attempts to set a dynamic
property which will eventually map to the same Kafka property. So, Iâve
decided that any dynamic property will simply override an existing property
with WARNING message displayed. It is actually consistent with how Kafka does
it and displayed the overrides in the console. Updated the relevant annotation
description.
It is also worth to mentioned that current code was using an old property
from Kafka 0.7 (âzk.connectiontimeout.msâ) which is no longer present in
Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 -
Property zk.connectiontimeout.ms is not valid). The add/override strategy would
provide for more flexibility when dealing with Kafka volatile configuration
until things will settle down and we can get some sensible defaults in place.
While doing it addressed the following issues that were discovered while
making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the
onTrigger(..) method would block due to the fact that Kafkaâs
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would
stops successfully due to the interrupt. However in UI it would appear as ERROR
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to
let the task exit as quick as possible and that the whole thread maintenance
logic was there initially due to the fact that there was no way to tell Kafka
consumer to return immediately if there are no events. In this patch we are now
using âconsumer.timeout.msâ property of Kafka and setting its value to 1
millisecond (default is -1 - always block infinitely). This ensures that tasks
that attempted to read an empty topic will exit immediately just to be
rescheduled by NiFi based on user configurations.
ISSUE: Kafka would not release FlowFile with events if it didnât have
enough to complete the batch since it would block waiting for more messages
(based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafkaâs
ConsumerTimeoutException which is handled in the catch block where the FlowFile
with partial batch will be released to success. Not sure if we need to put a
WARN message. In fact in my opinion we should not as it may create unnecessary
confusion.
ISSUE: When configuring a consumer for topic and specifying multiple
concurrent consumers in âtopicCountMapâ based on
'context.getMaxConcurrentTasks()â each consumer would bind to a topic
partition. If you have less partitions then the value returned by
'context.getMaxConcurrentTasks()â you would essentially allocate Kafka
resources that would never get a chance to receive a single message (see more
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a
topic and in the event where 'context.getMaxConcurrentTasks()â value is
greater than the amount of partitions, the partition count will be used to when
creating âtopicCountMapâ and WARNING message will be displayed)see code).
Unfortunately we canât do anything with the actual tasks, but based on
current state of the code they will exit immediately just to be rescheduled
where the process will repeat. NOTE: That is not ideal as it will be
rescheduling tasks that will never have a chance to do anything, but at least
it could be fixed on the user side after reading the warning message.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---