[ 
https://issues.apache.org/jira/browse/STORM-2542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hugo Louro reassigned STORM-2542:
---------------------------------

    Assignee: Stig Rohde Døssing  (was: Hugo Louro)

> Deprecate storm-kafka-client KafkaConsumer.subscribe API subscriptions on 1.x 
> and remove them as options in 2.x
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-2542
>                 URL: https://issues.apache.org/jira/browse/STORM-2542
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Most of this is copied off the mailing list post:
> We've recently seen some issues raised by users using the default 
> subscription API in the new KafkaSpout 
> (https://issues.apache.org/jira/browse/STORM-2514, 
> https://issues.apache.org/jira/browse/STORM-2538).
> A while ago an alternative subscription implementation was added 
> (https://github.com/apache/storm/pull/1835), which uses the 
> KafkaConsumer.assign API instead.
> The {{subscribe}} API used by default causes Kafka to assign partitions to 
> available consumers automatically. It allows a consumer group to keep 
> processing even in the presence of crashes because partitions are reassigned 
> when a consumer becomes unavailable.
> The {{assign}} API used in the alternative subscription implementation leaves 
> it up to the consuming code to figure out a reasonable partition distribution 
> among a consumer group. The {{assign}} API is essentially equivalent to how 
> the old storm-kafka spout distributes partitions across spout instances, and 
> as far as I know it has worked well there.
> Storm already ensures that all spout instances are running, and restarts them 
> if they crash, so we're not really gaining much by using the subscribe API.
> The disadvantages to using the subscribe API are:
> * Whenever an executor crashes, the Kafka cluster reassigns all partitions. 
> This causes all KafkaSpout instances in that consumer group to pause until 
> reassignment is complete.
> * The partition assignment is random, so it is difficult for users to predict 
> which partitions are assigned to which spout task.
> * The subscribe API is extremely likely to cause hangs and other weird 
> behavior if the KafkaSpout is configured to run multiple tasks in an 
> executor. When KafkaConsumer.poll is called during partition reassignment, it 
> will block until the reassignment is complete. If there are multiple 
> consumers in a thread, the first consumer to get called will block, and the 
> other consumer will get ejected from the list of active consumers after a 
> timeout, because it didn't manage to call poll during the rebalance. See the 
> example code in https://issues.apache.org/jira/browse/STORM-2514, which runs 
> two KafkaConsumers in one thread. The result is that they flip flop between 
> being active, and most polls take ~30 seconds (the Kafka session timeout)
> * The random assignment of partitions causes more message duplication than is 
> necessary. When an executor crashes, all the other executors have their 
> partitions reassigned. This makes it likely that some of them will lose a 
> partition they had in-flight tuples on, which they will then be unable to 
> commit to Kafka. The message is then reemitted by whichever KafkaSpout 
> instance was assigned the partition. See 
> https://issues.apache.org/jira/browse/STORM-2538
> I'd like to drop support for the subscribe API, and switch to using the 
> assign API by default.
> The KafkaConsumer Javadoc even mentions applications like Storm as a case 
> where the {{subscribe}} API doesn't really add value.
> {quote}
> If the process itself is highly available and will be restarted if it fails 
> (perhaps using a cluster management framework like YARN, Mesos, or AWS 
> facilities, or as part of a stream processing framework). In this case there 
> is no need for Kafka to detect the failure and reassign the partition since 
> the consuming process will be restarted on another machine. 
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to