[
https://issues.apache.org/jira/browse/STORM-2542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stig Rohde Døssing resolved STORM-2542.
---------------------------------------
Resolution: Fixed
Fix Version/s: 2.0.0
> Deprecate storm-kafka-client KafkaConsumer.subscribe API subscriptions on 1.x
> and remove them as options in 2.x
> ---------------------------------------------------------------------------------------------------------------
>
> Key: STORM-2542
> URL: https://issues.apache.org/jira/browse/STORM-2542
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka-client
> Affects Versions: 2.0.0, 1.1.0
> Reporter: Stig Rohde Døssing
> Assignee: Stig Rohde Døssing
> Fix For: 2.0.0
>
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Most of this is copied off the mailing list post:
> We've recently seen some issues raised by users using the default
> subscription API in the new KafkaSpout
> (https://issues.apache.org/jira/browse/STORM-2514,
> https://issues.apache.org/jira/browse/STORM-2538).
> A while ago an alternative subscription implementation was added
> (https://github.com/apache/storm/pull/1835), which uses the
> KafkaConsumer.assign API instead.
> The {{subscribe}} API used by default causes Kafka to assign partitions to
> available consumers automatically. It allows a consumer group to keep
> processing even in the presence of crashes because partitions are reassigned
> when a consumer becomes unavailable.
> The {{assign}} API used in the alternative subscription implementation leaves
> it up to the consuming code to figure out a reasonable partition distribution
> among a consumer group. The {{assign}} API is essentially equivalent to how
> the old storm-kafka spout distributes partitions across spout instances, and
> as far as I know it has worked well there.
> Storm already ensures that all spout instances are running, and restarts them
> if they crash, so we're not really gaining much by using the subscribe API.
> The disadvantages to using the subscribe API are:
> * Whenever an executor crashes, the Kafka cluster reassigns all partitions.
> This causes all KafkaSpout instances in that consumer group to pause until
> reassignment is complete.
> * The partition assignment is random, so it is difficult for users to predict
> which partitions are assigned to which spout task.
> * The subscribe API is extremely likely to cause hangs and other weird
> behavior if the KafkaSpout is configured to run multiple tasks in an
> executor. When KafkaConsumer.poll is called during partition reassignment, it
> will block until the reassignment is complete. If there are multiple
> consumers in a thread, the first consumer to get called will block, and the
> other consumer will get ejected from the list of active consumers after a
> timeout, because it didn't manage to call poll during the rebalance. See the
> example code in https://issues.apache.org/jira/browse/STORM-2514, which runs
> two KafkaConsumers in one thread. The result is that they flip flop between
> being active, and most polls take ~30 seconds (the Kafka session timeout)
> * The random assignment of partitions causes more message duplication than is
> necessary. When an executor crashes, all the other executors have their
> partitions reassigned. This makes it likely that some of them will lose a
> partition they had in-flight tuples on, which they will then be unable to
> commit to Kafka. The message is then reemitted by whichever KafkaSpout
> instance was assigned the partition. See
> https://issues.apache.org/jira/browse/STORM-2538
> I'd like to drop support for the subscribe API, and switch to using the
> assign API by default.
> The KafkaConsumer Javadoc even mentions applications like Storm as a case
> where the {{subscribe}} API doesn't really add value.
> {quote}
> If the process itself is highly available and will be restarted if it fails
> (perhaps using a cluster management framework like YARN, Mesos, or AWS
> facilities, or as part of a stream processing framework). In this case there
> is no need for Kafka to detect the failure and reassign the partition since
> the consuming process will be restarted on another machine.
> {quote}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)