[ https://issues.apache.org/jira/browse/STORM-2542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hugo Louro reassigned STORM-2542: --------------------------------- Assignee: Hugo Louro (was: Stig Rohde Døssing) > Deprecate storm-kafka-client KafkaConsumer.subscribe API subscriptions on 1.x > and remove them as options in 2.x > --------------------------------------------------------------------------------------------------------------- > > Key: STORM-2542 > URL: https://issues.apache.org/jira/browse/STORM-2542 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client > Affects Versions: 2.0.0, 1.1.0 > Reporter: Stig Rohde Døssing > Assignee: Hugo Louro > Time Spent: 1h > Remaining Estimate: 0h > > Most of this is copied off the mailing list post: > We've recently seen some issues raised by users using the default > subscription API in the new KafkaSpout > (https://issues.apache.org/jira/browse/STORM-2514, > https://issues.apache.org/jira/browse/STORM-2538). > A while ago an alternative subscription implementation was added > (https://github.com/apache/storm/pull/1835), which uses the > KafkaConsumer.assign API instead. > The {{subscribe}} API used by default causes Kafka to assign partitions to > available consumers automatically. It allows a consumer group to keep > processing even in the presence of crashes because partitions are reassigned > when a consumer becomes unavailable. > The {{assign}} API used in the alternative subscription implementation leaves > it up to the consuming code to figure out a reasonable partition distribution > among a consumer group. The {{assign}} API is essentially equivalent to how > the old storm-kafka spout distributes partitions across spout instances, and > as far as I know it has worked well there. > Storm already ensures that all spout instances are running, and restarts them > if they crash, so we're not really gaining much by using the subscribe API. > The disadvantages to using the subscribe API are: > * Whenever an executor crashes, the Kafka cluster reassigns all partitions. > This causes all KafkaSpout instances in that consumer group to pause until > reassignment is complete. > * The partition assignment is random, so it is difficult for users to predict > which partitions are assigned to which spout task. > * The subscribe API is extremely likely to cause hangs and other weird > behavior if the KafkaSpout is configured to run multiple tasks in an > executor. When KafkaConsumer.poll is called during partition reassignment, it > will block until the reassignment is complete. If there are multiple > consumers in a thread, the first consumer to get called will block, and the > other consumer will get ejected from the list of active consumers after a > timeout, because it didn't manage to call poll during the rebalance. See the > example code in https://issues.apache.org/jira/browse/STORM-2514, which runs > two KafkaConsumers in one thread. The result is that they flip flop between > being active, and most polls take ~30 seconds (the Kafka session timeout) > * The random assignment of partitions causes more message duplication than is > necessary. When an executor crashes, all the other executors have their > partitions reassigned. This makes it likely that some of them will lose a > partition they had in-flight tuples on, which they will then be unable to > commit to Kafka. The message is then reemitted by whichever KafkaSpout > instance was assigned the partition. See > https://issues.apache.org/jira/browse/STORM-2538 > I'd like to drop support for the subscribe API, and switch to using the > assign API by default. > The KafkaConsumer Javadoc even mentions applications like Storm as a case > where the {{subscribe}} API doesn't really add value. > {quote} > If the process itself is highly available and will be restarted if it fails > (perhaps using a cluster management framework like YARN, Mesos, or AWS > facilities, or as part of a stream processing framework). In this case there > is no need for Kafka to detect the failure and reassign the partition since > the consuming process will be restarted on another machine. > {quote} -- This message was sent by Atlassian JIRA (v6.4.14#64029)