[
https://issues.apache.org/jira/browse/NIFI-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15022904#comment-15022904
]
ASF GitHub Bot commented on NIFI-1192:
--------------------------------------
GitHub user olegz opened a pull request:
https://github.com/apache/nifi/pull/131
NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties
set via UI and then maps those properties to the actual names used by Kafka, we
can not rely on NiFi UI to display an error if user attempts to set a dynamic
property which will eventually map to the same Kafka property. So, I’ve decided
that any dynamic property will simply override an existing property with
WARNING message displayed. It is actually consistent with how Kafka does it and
displayed the overrides in the console. Updated the relevant annotation
description.
It is also worth to mentioned that current code was using an old property
from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka
0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 -
Property zk.connectiontimeout.ms is not valid). The add/override strategy would
provide for more flexibility when dealing with Kafka volatile configuration
until things will settle down and we can get some sensible defaults in place.
While doing it addressed the following issues that were discovered while
making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the
onTrigger(..) method would block due to the fact that Kafka’s
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would
stops successfully due to the interrupt. However in UI it would appear as ERROR
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to
let the task exit as quick as possible and that the whole thread maintenance
logic was there initially due to the fact that there was no way to tell Kafka
consumer to return immediately if there are no events. In this patch we are now
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1
millisecond (default is -1 - always block infinitely). This ensures that tasks
that attempted to read an empty topic will exit immediately just to be
rescheduled by NiFi based on user configurations.
ISSUE: Kafka would not release FlowFile with events if it didn’t have
enough to complete the batch since it would block waiting for more messages
(based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s
ConsumerTimeoutException which is handled in the catch block where the FlowFile
with partial batch will be released to success. Not sure if we need to put a
WARN message. In fact in my opinion we should not as it may create unnecessary
confusion.
ISSUE: When configuring a consumer for topic and specifying multiple
concurrent consumers in ‘topicCountMap’ based on
'context.getMaxConcurrentTasks()’ each consumer would bind to a topic
partition. If you have less partitions then the value returned by
'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka
resources that would never get a chance to receive a single message (see more
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a
topic and in the event where 'context.getMaxConcurrentTasks()’ value is greater
than the amount of partitions, the partition count will be used to when
creating ‘topicCountMap’ and WARNING message will be displayed)see code).
Unfortunately we can’t do anything with the actual tasks, but based on current
state of the code they will exit immediately just to be rescheduled where the
process will repeat. NOTE: That is not ideal as it will be rescheduling tasks
that will never have a chance to do anything, but at least it could be fixed on
the user side after reading the warning message.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/olegz/nifi NIFI-1192B
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/nifi/pull/131.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #131
----
commit f9d1b2811a08c372baf660a8a5e8d0f73e1a23a2
Author: Oleg Zhurakousky <[email protected]>
Date: 2015-11-23T20:15:03Z
NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties
set via UI and then maps those properties to the actual names used by Kafka, we
can not rely on NiFi UI to display an error if user attempts to set a dynamic
property which will eventually map to the same Kafka property. So, I’ve decided
that any dynamic property will simply override an existing property with
WARNING message displayed. It is actually consistent with how Kafka does it and
displayed the overrides in the console. Updated the relevant annotation
description.
It is also worth to mentioned that current code was using an old property
from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka
0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 -
Property zk.connectiontimeout.ms is not valid). The add/override strategy would
provide for more flexibility when dealing with Kafka volatile configuration
until things will settle down and we can get some sensible defaults in place.
While doing it addressed the following issues that were discovered while
making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the
onTrigger(..) method would block due to the fact that Kafka’s
ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would
stops successfully due to the interrupt. However in UI it would appear as ERROR
based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to
let the task exit as quick as possible and that the whole thread maintenance
logic was there initially due to the fact that there was no way to tell Kafka
consumer to return immediately if there are no events. In this patch we are now
using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1
millisecond (default is -1 - always block infinitely). This ensures that tasks
that attempted to read an empty topic will exit immediately just to be
rescheduled by NiFi based on user configurations.
ISSUE: Kafka would not release FlowFile with events if it didn’t have
enough to complete the batch since it would block waiting for more messages
(based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s
ConsumerTimeoutException which is handled in the catch block where the FlowFile
with partial batch will be released to success. Not sure if we need to put a
WARN message. In fact in my opinion we should not as it may create unnecessary
confusion.
ISSUE: When configuring a consumer for topic and specifying multiple
concurrent consumers in ‘topicCountMap’ based on
'context.getMaxConcurrentTasks()’ each consumer would bind to a topic
partition. If you have less partitions then the value returned by
'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka
resources that would never get a chance to receive a single message (see more
here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a
topic and in the event where 'context.getMaxConcurrentTasks()’ value is greater
than the amount of partitions, the partition count will be used to when
creating ‘topicCountMap’ and WARNING message will be displayed)see code).
Unfortunately we can’t do anything with the actual tasks, but based on current
state of the code they will exit immediately just to be rescheduled where the
process will repeat. NOTE: That is not ideal as it will be rescheduling tasks
that will never have a chance to do anything, but at least it could be fixed on
the user side after reading the warning message.
----
> Allow Get/PutKafka to honor dynamic properties
> ----------------------------------------------
>
> Key: NIFI-1192
> URL: https://issues.apache.org/jira/browse/NIFI-1192
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 0.3.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Priority: Critical
> Fix For: 0.4.0
>
>
> Currently Kafka does not honor dynamic properties which means aside from 8
> properties exposed none others could be set
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)