Hi Jason, Thanks for the thoughtful comments. Please see my response below.
BTW, I have been trying to update the KIP with some of the recent discussions on the mailing list. Regards, --Vahid From: Jason Gustafson <ja...@confluent.io> To: dev@kafka.apache.org Date: 06/27/2016 12:53 PM Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy Hey Vahid, Comments below: I'm not very clear on the first part of this paragraph. You could clarify > it for me, but in general balancing out the partitions across consumers in > a group as much as possible would normally mean balancing the load within > the cluster, and that's something a user would want to have compared to > cases where the assignments and therefore the load could be quite > unbalanced depending on the subscriptions. I'm just wondering what kind of use cases require differing subscriptions in a steady state. Usually we expect all consumers in the group to have the same subscription, in which case the balance provided by round robin should be even (in terms of the number of assigned partitions). The only case that comes to mind is a rolling upgrade scenario in which the consumers in the group are restarted one by one with an updated subscription. It would be ideal to provide better balance in this situation, but once the upgrade finishes, the assignment should be balanced again, so it's unclear to me how significant the gain is. On the other hand, if there are cases which require differing subscriptions in a long term state, it would make this feature more compelling. I agree that if we care only about a balanced assignment with same subscriptions the round robin assignment is a good choice. But if we bring in stickiness to the mix it won't be guaranteed by the round robin assignor. An example (as Andrew mentioned in his earlier note) is elastic consumers that come and go automatically depending on the load and how much they lag behind. If these consumer maintain state of the partitions they consume from it would be reasonable to want them to stick to their assigned partitions, rather than having to repeat partition cleanup every time the number of consumers changes due to an increase or decrease in load. I'll also think about it and let you know if I come up with a use case with differing subscriptions. If differing subscriptions turns out not to be a common use case, the design and implementation of the sticky assignor could be modified to a far less complex setting so that fairness/stickiness can be guaranteed for same subscriptions. As I mentioned before, the current design / implementation is comprehensive and can be tweaked towards a less complex solution if further assumptions can be made. Since the new consumer is single threaded there is no such problem in its > round robin strategy. It simply considers consumers one by one for each > partition assignment, and when one consumer is assigned a partition, the > next assignment starts with considering the next consumer in the list (and > not the same consumer that was just assigned). This removes the > possibility of the issue reported in KAFKA-2019 surfacing in the new > consumer. In the sticky strategy we do not have this issue either, since > every time an assignment is about to happen we start with the consumer > with least number of assignments. So we will not have a scenario where a > consumer is repeated assigned partitions as in KAFKA-2019 (unless that > consumer is lagging behind other consumers on the number of partitions > assigned). Thanks for checking into this. I think the other factor is that the round robin assignor sorts the consumers using the id given them by the coordinator, which at the moment looks like this: "{clientId}-{uuid}". So if the group uses a common clientId, then it shouldn't usually be the case that two consumers on the same host get ordered together. We could actually change the order of these fields in a compatible way if we didn't like the dependence on the clientId. It seems anyway that the sticky assignor is not needed to deal with this problem. That's correct, and thanks for going into the issue in more details. Even though consumer groups are usually stable, it might be the case that > consumers do not initially join the group at the same time. The sticky > strategy in that situation lets those who joined earlier stick to their > partitions to some extent (assuming fairness take precedence over > stickiness). In terms of specific use cases, Andrew touched on examples of > how Kafka can benefit from a sticky assignor. I could add those to the KIP > if you also think they help building the case in favor of sticky assignor. > I agree with you about the downside and I'll make sure I add that to the > KIP as you suggested. Yep, I agree that it helps in some situations, but I think the impact is amortized over the life of the group. It also takes a bit more work to explain this to users and may require them to change their usage pattern a little bit. I think we expect users to do something like the following in their rebalance listener: class MyRebalanceListener { void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { commitOffsets(partition); cleanupState(partition); } } void onPartitionsAssigned(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { initializeState(partition); initializeOffset(partition); } } } This is fairly intuitive, but if you use this pattern, then sticky assignment doesn't give you anything because you always cleanup state prior to the rebalance. Instead you need to do something like this: class MyRebalanceListener { Collection<TopicPartition> lastAssignment = Collections.emptyList(); void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { commitOffsets(partition); } } void onPartitionsAssigned(Collection<TopicPartition> assignment) { for (TopicPartition partition : difference(lastAssignment, assignment) { cleanupState(partition); } for (TopicPartition partition : difference(assignment, lastAssignment) { initializeState(partition); } for (TopicPartition partition : assignment) { initializeOffset(partition); } this.lastAssignment = assignment; } } This seems harder to explain and probably is the reason why Andy was suggesting that it would be more ideal if we could simply skip the call to onRevoked() if the partitions remain assigned to the consumer after the rebalance. Unfortunately, the need to commit offsets prior to rebalancing makes this tricky. The other option suggested by Andy would be to introduce a third method in the rebalance listener (e.g. doOffsetCommit(partitions)). Then the consumer would call doOffsetCommit() prior to every rebalance, but only invoke onPartitionsRevoked() when partitions have actually been assigned to another consumer following the rebalance. Either way, we're making the API more complex, which would be nice to avoid unless really necessary. Thanks for the code snippets. They look good and understandable given the current callback listeners design. I agree that with an additional callback as Andy suggested things would be easier to justify and explain. As you mentioned, it's a matter of whether we want the additional complexity that comes with it. Overall, I think my feeling at the moment is that the sticky assignor is a nice improvement over the currently available assignors, but the gain seems a little marginal and maybe not worth the cost of the complexity mentioned above. It's not a strong feeling though and it would be nice to hear what others think. The other thing worth mentioning is that we've talked a few times in the past about the concept of "partial rebalancing," which would allow the group to reassign only a subset of the partitions it was consuming. This would let part of the group continue consuming while the group is rebalancing. We don't have any proposals ready to support this, but if we want to have this long term, then it might reduce some of the benefit provided by the sticky assignor. Understood, and thanks for sharing your concerns and feedback. I hope we can get more feedback from the community on whether a sticky partition assignment strategy in any form is beneficial to Kafka. Thanks, Jason