Thanks Guozhang!

The simplified upgrade path is great!


Just a clarification question about the "Rebalance Callback Error
Handling" -- does this change affect the `ConsumerCoordinator` only if
incremental rebalancing is use? Or does the behavior also change for the
eager rebalancing case?


-Matthias


On 5/9/19 3:37 AM, Guozhang Wang wrote:
> Hello all,
> 
> Thanks for everyone who've shared their feedbacks for this KIP! If there's
> no further comments I'll start the voting thread by end of tomorrow.
> 
> 
> Guozhang.
> 
> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hello Boyang,
>>
>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote:
>>
>>> Hey Guozhang,
>>>
>>> thank you for the great write up. Overall the motivation and changes
>>> LGTM, just some minor comments:
>>>
>>>
>>>   1.  In "Consumer Coordinator Algorithm", we could reorder alphabet
>>> points for 3d~3f from ["ready-to-migrate-partitions",
>>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3.
>>>
>>
>> Ack. Updated.
>>
>>
>>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke all
>>> partition upon heartbeat/commit fail. What's the gain here? Do we want to
>>> keep all partitions running at this moment, to be optimistic for the case
>>> when no partitions get reassigned?
>>>
>>
>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just
>> re-join the group with all the currently owned partitions encoded. Updated.
>>
>>
>>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': " The
>>> 'sticky' assignor works even those there are "
>>>
>>
>> Ack, should be `even when`.
>>
>>
>>>   4.  Put two "looking into the future" into a separate category from
>>> migration session. It seems inconsistent for readers to see this before we
>>> finished discussion for everything.
>>>
>>
>> Ack.
>>
>>
>>>   5.  Have we discussed the concern on the serialization? Could the new
>>> metadata we are adding grow larger than the message size cap?
>>>
>>
>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which
>> should largely reduce the message size (will update the wiki accordingly as
>> well).
>>
>>
>>>
>>> Boyang
>>>
>>> ________________________________
>>> From: Guozhang Wang <wangg...@gmail.com>
>>> Sent: Monday, April 15, 2019 9:20 AM
>>> To: dev
>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
>>>
>>> Hello Jason,
>>>
>>> I agree with you that for range / round-robin it makes less sense to be
>>> compatible with cooperative rebalance protocol.
>>>
>>> As for StickyAssignor, however, I think it would still be possible to make
>>> the current implementation to be compatible with cooperative rebalance. So
>>> after pondering on different options at hand I'm now proposing this
>>> approach as listed in the upgrade section:
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
>>>
>>> The idea is to let assignors specify which protocols it would work with,
>>> associating with a different name; then the upgrade path would involve a
>>> "compatible" protocol which actually still use eager behavior while
>>> encoding two assignors if possible. In "Rejected Section" (just to clarify
>>> I'm not finalizing it as rejected, just putting it there for now, and if
>>> we
>>> like this one instead we can always switch them) I listed the other
>>> approach we once discussed about, and arguing its cons of duplicated class
>>> seems overwhelm the pros of saving the  "rebalance.protocol" config.
>>>
>>> Let me know WDYT.
>>>
>>> Guozhang
>>>
>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io>
>>> wrote:
>>>
>>>> Hi Guozhang,
>>>>
>>>> Responses below:
>>>>
>>>> 2. The interface's default implementation will just be
>>>>> `onPartitionRevoked`, so for user's instantiation if they do not make
>>> any
>>>>> code changes they should be able to recompile the code and continue.
>>>>
>>>>
>>>> Ack, makes sense.
>>>>
>>>> 4. Hmm.. not sure if it will work. The main issue is that the
>>>>> consumer-coordinator behavior (whether to revoke all or none at
>>>>> onRebalancePrepare) is independent of the selected protocol's assignor
>>>>> (eager or cooperative), so even if the assignor is selected to be the
>>>>> old-versioned one, we will still not revoke at the
>>> consumer-coordinator
>>>>> layer and hence has the same risk of migrating still-owned partitions,
>>>>> right?
>>>>
>>>>
>>>> Yeah, basically we would have to push the eager/cooperative logic into
>>> the
>>>> PartitionAssignor itself and make the consumer aware of the rebalance
>>>> protocol it is compatible with. As long as an eager protocol _could_ be
>>>> selected, the consumer would have to be pessimistic and do eager
>>>> revocation. But if all the assignors configured in the consumer support
>>>> cooperative reassignment, then either 1) a cooperative protocol will be
>>>> selected and cooperative revocation can be safely used, or 2) if the
>>> rest
>>>> of the group does not support it, then the consumer will simply fail.
>>>>
>>>> Another point which you raised offline and I will repeat here is that
>>> this
>>>> proposal's benefit is mostly limited to sticky assignment logic.
>>> Arguably
>>>> the range assignor may have some incidental stickiness, particularly if
>>> the
>>>> group is rebalancing for a newly created or deleted topic. For other
>>> cases,
>>>> the proposal is mostly additional overhead since it takes an additional
>>>> rebalance and many of the partitions will move. Perhaps it doesn't make
>>> as
>>>> much sense to use the cooperative protocol for strategies like range and
>>>> round-robin. That kind of argues in favor of pushing some of the control
>>>> into the assignor itself. Maybe we would not bother creating
>>>> CooperativeRange as I suggested above, but it would make sense to
>>> create a
>>>> cooperative version of the sticky assignment strategy. I thought we
>>> might
>>>> have to create a new sticky assignor anyway because I can't see how we
>>>> would get compatible behavior mixing with the old version anyway.
>>>>
>>>> Thanks,
>>>> Jason
>>>>
>>>>
>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>>
>>>>> Hello Matthias:
>>>>>
>>>>> Thanks for your review.
>>>>>
>>>>> The background section uses streams assignor as well as the consumer's
>>>> own
>>>>> stick assignor as examples illustrating the situation, but this KIP is
>>>> for
>>>>> consumer coordinator itself, and the rest of the paragraph did not
>>> talk
>>>>> about Streams any more. If you feel it's a bit distracted I can remove
>>>>> those examples.
>>>>>
>>>>> 10). While working on the PR I realized that the revoked partitions on
>>>>> assignment is not needed (this is being discussed on the PR itself:
>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
>>>>>
>>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's
>>>>> cleanup assignment and re-join, and not letting assignor making any
>>>>> proactive changes. The idea is to keep logic simpler and not doing any
>>>>> "split brain" stuff.
>>>>>
>>>>> 20). 2.b. No we do not need, since the owned-partitions will be part
>>> of
>>>> the
>>>>> Subscription passed in to assign() already.
>>>>>
>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be
>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some
>>> more
>>>>> details can be found on the discussion thread of KIP-345 itself). One
>>>>> example is that as the instance resumes, its member id will be empty
>>> so
>>>> we
>>>>> are still relying on assignor to give it the assignment from the old
>>>>> member-id while keeping all other member's assignment unchanged.
>>>>>
>>>>> 40). Incomplete sentence, I've updated it.
>>>>>
>>>>> 50). Here's my idea: suppose we augment the join group schema with
>>>>> `protocol version` in 2.3, and then with both brokers and clients
>>> being
>>>> in
>>>>> version 2.3+, on the first rolling bounce where subscription and
>>>> assignment
>>>>> schema and / or user metadata has changed, this protocol version will
>>> be
>>>>> bumped. On the broker side, when receiving all member's join-group
>>>> request,
>>>>> it will choose the one that has the highest protocol version (also it
>>>>> assumes higher versioned protocol is always backward compatible, i.e.
>>> the
>>>>> coordinator can recognize lower versioned protocol as well) and
>>> select it
>>>>> as the leader. Then the leader can decide, based on its received and
>>>>> deserialized subscription information, how to assign partitions and
>>> how
>>>> to
>>>>> encode the assignment accordingly so that everyone can understand it.
>>>> With
>>>>> this, in Streams for example, no version probing would be needed
>>> since we
>>>>> are guaranteed the leader knows everyone's version -- again it is
>>>> assuming
>>>>> that higher versioned protocol is always backward compatible -- and
>>> hence
>>>>> can successfully do the assignment at that round.
>>>>>
>>>>> 60). My bad, this section was not updated while the design was
>>> evolved,
>>>>> I've updated it.
>>>>>
>>>>>
>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
>>> wrote:
>>>>>
>>>>>>
>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is
>>>> that
>>>>>> it is a rather fixed trade-off between
>>>>>>
>>>>>> task availability and resource shuffling. If we eventually trigger
>>>>>> rebalance after rolling bounce, certain consumer
>>>>>>
>>>>>> setup is still faced with global shuffles, for example member.id
>>>> ranking
>>>>>> based round robin strategy, as rejoining dynamic
>>>>>>
>>>>>> members will be assigned with new member.id which reorders the
>>>>>> assignment. So I think the primary goal of incremental
>>>>>>
>>>>>> rebalancing is still improving the cluster availability during
>>>> rebalance,
>>>>>> because it didn't revoke any partition during this
>>>>>>
>>>>>> process. Also, the perk is minimum configuration requirement :)
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Boyang
>>>>>>
>>>>>> ________________________________
>>>>>> From: Matthias J. Sax <matth...@confluent.io>
>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
>>>>>> To: dev
>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>> Streams
>>>>>>
>>>>>> Thank for the KIP, Boyang and Guozhang!
>>>>>>
>>>>>>
>>>>>> I made an initial pass and have some questions/comments. One high
>>> level
>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
>>> Streams
>>>>>> use case a little bit (at least in the presentation). It might be
>>>>>> helpful to separate both cases clearly, or maybe limit the scope to
>>>>>> plain consumer only.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
>>> method
>>>>>> `List<TopicPartitions> revokedPartitions()` ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 20) In Section "Consumer Coordinator Algorithm"
>>>>>>
>>>>>>     Bullet point "1a)": If the subscription changes and a topic is
>>>>>> removed from the subscription, why do we not revoke the partitions?
>>>>>>
>>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
>>>>>> partition is removed/deleted from a topic)? Should we call the new
>>>>>> `onPartitionsEmigrated()` callback for this case?
>>>>>>
>>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
>>>>>> interface to pass in the "old assignment" as third parameter into
>>>>>> `assign()`?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay
>>>>>> subsume KIP-345? Configuring static members is rather complicated,
>>> and
>>>> I
>>>>>> am wondering if a rebalance delay would be sufficient?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever."
>>>>>>
>>>>>> What is "case 3.b" ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> 50) Section "Looking into the Future"
>>>>>>
>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line
>>>>>> describing "JoinGroupRequest"
>>>>>>
>>>>>>> This can also help saving "version probing" cost on Streams as
>>> well.
>>>>>>
>>>>>> How does this relate to Kafka Streams "version probing"
>>> implementation?
>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve
>>>>>> "version probing" ? I have a rough idea, but would like to hear more
>>>>>> details.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 60) Section "Recommended Upgrade Procedure"
>>>>>>
>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force
>>>> the
>>>>>> stream application to stay with protocol type "consumer".
>>>>>>
>>>>>> This config is not discussed in the KIP and appears in this section
>>>>>> without context. Can you elaborate about it?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
>>>>>>> Bump up on this discussion thread. I've added a few new drawings
>>> for
>>>>>> better
>>>>>>> illustration, would really appreciate your feedbacks.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Boyang,
>>>>>>>>
>>>>>>>> I've made another thorough pass over this KIP and I'd like to
>>> spilt
>>>> it
>>>>>>>> into two parts: the first part, covered in KIP-429 would be
>>> touching
>>>>> on
>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol
>>> in
>>>>>> place.
>>>>>>>> The second part (for now I've reserved KIP number 444 for it)
>>> would
>>>>>> contain
>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up
>>> new
>>>>>>>> members.
>>>>>>>>
>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
>>> ready
>>>>> for
>>>>>>>> review and discussions again. Would love to hear people's
>>> feedbacks
>>>>> and
>>>>>>>> ideas.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com
>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
>>>>>>>>>
>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>> "learner
>>>>>>>>> task" in addition to "standby task": if the only difference is
>>> that
>>>>> for
>>>>>>>>> the
>>>>>>>>> latter, we would consider workload balance while for the former
>>> we
>>>>>> would
>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>> a
>>>> bit
>>>>>> to
>>>>>>>>> break that difference. Adding a new type of task would be
>>> adding a
>>>>> lot
>>>>>> of
>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>> standby-task
>>>>>>>>> I would prefer to do so.
>>>>>>>>> In the proposal we stated that we are not adding a new type of
>>> task
>>>>>>>>> implementation. The
>>>>>>>>> learner task shall share the same implementation with normal
>>>> standby
>>>>>>>>> task, only that we
>>>>>>>>> shall tag the standby task with learner and prioritize the
>>> learner
>>>>>> tasks
>>>>>>>>> replay effort.
>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>> which
>>>>>>>>> layer
>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>> would
>>>>> not
>>>>>>>>> require internal implementation details but only public facing
>>> API
>>>>>>>>> updates,
>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>> details
>>>> on
>>>>>> the
>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>> full
>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>> inside,
>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable
>>>>>>>>> PartitionAssignor interface and inside the
>>> StreamsPartitionAssignor
>>>>> we
>>>>>>>>> also
>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>> sites
>>>>>> in
>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>
>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>> StreamsPartitionAssignor ->
>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>
>>>>>>>>> We need to think about where the proposed implementation would
>>> take
>>>>>> place
>>>>>>>>> at, and personally I think it is not the best option to inject
>>> all
>>>> of
>>>>>> them
>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
>>>>> logic
>>>>>> of
>>>>>>>>> "triggering another rebalance" etc would require some
>>> coordinator
>>>>> logic
>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>> hand,
>>>>>>>>> since
>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>> just
>>>>>> replace
>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>> Connect
>>>>>>>>> does
>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>> both
>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>> KIP-98/KIP-129.
>>>>>>>>> And then the key thing to consider is how to cut off the
>>> boundary
>>>> so
>>>>>> that
>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>> beneficial
>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>> logic
>>>>> at
>>>>>>>>> the
>>>>>>>>> assignor level.
>>>>>>>>> Yes, that's also my ideal plan. The details for the
>>> implementation
>>>>> are
>>>>>>>>> depicted
>>>>>>>>> in this doc<
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>> ,
>>>>>> [
>>>>>>
>>>>>
>>>>
>>> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
>>>>>> ]<
>>>>>>
>>>>>
>>>>
>>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>
>>>>>>
>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
>>>>>>
>>>>>
>>>>
>>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
>>>>>>>
>>>>>> docs.google.com
>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang
>>>> Chen,
>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
>>> Background We
>>>>>> initiated KIP-429 for the promotion of incremental rebalancing work
>>> for
>>>>>> KStream. Behind the scene, there is non-trivial amount of effort
>>> that
>>>>> needs
>>>>>> to...
>>>>>>
>>>>>>
>>>>>>
>>>>>>>>> and I have explained the reasoning on why we want to push a
>>>>>>>>> global change of replacing ConsumerCoordinator with
>>>>> StreamCoordinator.
>>>>>>>>> The motivation
>>>>>>>>> is that KIP space is usually used for public & algorithm level
>>>>> change,
>>>>>>>>> not for internal
>>>>>>>>> implementation details.
>>>>>>>>>
>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>> plan
>>>>>> would
>>>>>>>>> also be quite different. For example, if we stay with
>>>>>> ConsumerCoordinator
>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>> make
>>>>> all
>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>> consumers,
>>>>> then
>>>>>>>>> our
>>>>>>>>> migration plan could be much easier.
>>>>>>>>> Yes, the upgrade plan was designed to take the new
>>>> StreamCoordinator
>>>>>>>>> approach
>>>>>>>>> which means we shall define a new protocol type. For existing
>>>>>> application
>>>>>>>>> we could only
>>>>>>>>> maintain the same `consumer` protocol type is because current
>>>> broker
>>>>>> only
>>>>>>>>> allows
>>>>>>>>> change of protocol type when the consumer group is empty. It is
>>> of
>>>>>> course
>>>>>>>>> user-unfriendly to force
>>>>>>>>> a wipe-out for the entire application, and I don't think
>>>> maintaining
>>>>>> old
>>>>>>>>> protocol type would greatly
>>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
>>>>>>>>>
>>>>>>>>> 4. I think one major issue related to this KIP is that today, in
>>>> the
>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>> workload
>>>>>>>>> balance, and hence "learner task" is needed to break this
>>> priority,
>>>>> but
>>>>>>>>> I'm
>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>> assignor
>>>>>>>>> that
>>>>>>>>> accommodate this?
>>>>>>>>> Great question! That's what I explained in the proposal, which
>>> is
>>>>> that
>>>>>> we
>>>>>>>>> should breakdown our
>>>>>>>>> delivery into different stages. At very beginning, our goal is
>>> to
>>>>>> trigger
>>>>>>>>> learner task assignment only on
>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
>>> previous
>>>>>> round
>>>>>>>>> of rebalance to figure out. After
>>>>>>>>> stage one, our goal is to have a smooth scaling up experience,
>>> but
>>>>> the
>>>>>>>>> task balance problem is kind of orthogonal.
>>>>>>>>> The load balance problem is a much broader topic than auto
>>> scaling,
>>>>>> which
>>>>>>>>> I figure worth discussing within
>>>>>>>>> this KIP's context since it's a naturally next-step, but
>>> wouldn't
>>>> be
>>>>>> the
>>>>>>>>> main topic.
>>>>>>>>> Learner task or auto scaling support should be treated as `a
>>>> helpful
>>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining
>>>> load
>>>>>>>>> balance`. It would be great if you could share some insights of
>>> the
>>>>>> stream
>>>>>>>>> task balance, which eventually helps us to break out of the
>>>> KIP-429's
>>>>>> scope
>>>>>>>>> and even define a separate KIP to focus on task weight &
>>> assignment
>>>>>> logic
>>>>>>>>> improvement.
>>>>>>>>>
>>>>>>>>> Also thank you for making improvement on the KIP context and
>>>>>> organization!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Boyang
>>>>>>>>> ________________________________
>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
>>>>>>>>> To: dev
>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
>>>>> Streams
>>>>>>>>>
>>>>>>>>> Hello Boyang,
>>>>>>>>>
>>>>>>>>> I've just made a quick pass on the KIP and here are some
>>> thoughts.
>>>>>>>>>
>>>>>>>>> Meta:
>>>>>>>>>
>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
>>>>> "learner
>>>>>>>>> task" in addition to "standby task": if the only difference is
>>> that
>>>>> for
>>>>>>>>> the
>>>>>>>>> latter, we would consider workload balance while for the former
>>> we
>>>>>> would
>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor
>>> a
>>>> bit
>>>>>> to
>>>>>>>>> break that difference. Adding a new type of task would be
>>> adding a
>>>>> lot
>>>>>> of
>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
>>>>>> standby-task
>>>>>>>>> I would prefer to do so.
>>>>>>>>>
>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is
>>>> which
>>>>>>>>> layer
>>>>>>>>> would the logic be implemented at. Although for most KIPs we
>>> would
>>>>> not
>>>>>>>>> require internal implementation details but only public facing
>>> API
>>>>>>>>> updates,
>>>>>>>>> for a KIP like this I think it still requires to flesh out
>>> details
>>>> on
>>>>>> the
>>>>>>>>> implementation design. More specifically: today Streams embed a
>>>> full
>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator
>>>>> inside,
>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable
>>>>>>>>> PartitionAssignor interface and inside the
>>> StreamsPartitionAssignor
>>>>> we
>>>>>>>>> also
>>>>>>>>> have a TaskAssignor interface whose default implementation is
>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today
>>>> sites
>>>>>> in
>>>>>>>>> the latter two classes. Hence the hierarchy today is:
>>>>>>>>>
>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
>>> StreamsPartitionAssignor ->
>>>>>>>>> StickyTaskAssignor.
>>>>>>>>>
>>>>>>>>> We need to think about where the proposed implementation would
>>> take
>>>>>> place
>>>>>>>>> at, and personally I think it is not the best option to inject
>>> all
>>>> of
>>>>>> them
>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the
>>>>> logic
>>>>>> of
>>>>>>>>> "triggering another rebalance" etc would require some
>>> coordinator
>>>>> logic
>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other
>>>> hand,
>>>>>>>>> since
>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
>>> just
>>>>>> replace
>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like
>>>>> Connect
>>>>>>>>> does
>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in
>>> both
>>>>>>>>> consumer layer and streams-assignor layer like we did in
>>>>>> KIP-98/KIP-129.
>>>>>>>>> And then the key thing to consider is how to cut off the
>>> boundary
>>>> so
>>>>>> that
>>>>>>>>> the modifications we push to ConsumerCoordinator would be
>>>> beneficial
>>>>>>>>> universally for any consumers, while keep the Streams-specific
>>>> logic
>>>>> at
>>>>>>>>> the
>>>>>>>>> assignor level.
>>>>>>>>>
>>>>>>>>> 3. Depending on which design direction we choose, our migration
>>>> plan
>>>>>> would
>>>>>>>>> also be quite different. For example, if we stay with
>>>>>> ConsumerCoordinator
>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
>>> make
>>>>> all
>>>>>>>>> changes agnostic to brokers as well as to old versioned
>>> consumers,
>>>>> then
>>>>>>>>> our
>>>>>>>>> migration plan could be much easier.
>>>>>>>>>
>>>>>>>>> 4. I think one major issue related to this KIP is that today, in
>>>> the
>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over
>>>>>> workload
>>>>>>>>> balance, and hence "learner task" is needed to break this
>>> priority,
>>>>> but
>>>>>>>>> I'm
>>>>>>>>> wondering if we can have a better solution within sticky task
>>>>> assignor
>>>>>>>>> that
>>>>>>>>> accommodate this?
>>>>>>>>>
>>>>>>>>> Minor:
>>>>>>>>>
>>>>>>>>> 1. The idea of two rebalances have also been discussed in
>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should
>>> add
>>>>> the
>>>>>>>>> reference on the wiki page as well.
>>>>>>>>> 2. Could you also add a section describing how the subscription
>>> /
>>>>>>>>> assignment metadata will be re-formatted? Without this
>>> information
>>>> it
>>>>>> is
>>>>>>>>> hard to get to the bottom of your idea. For example in the
>>> "Leader
>>>>>>>>> Transfer
>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is
>>>> new
>>>>>>>>> member"
>>>>>>>>> and hence would blindly obey stickiness over workload balance
>>>>>> requirement.
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
>>> bche...@outlook.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey community friends,
>>>>>>>>>>
>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add
>>>>>>>>> incremental
>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
>>>>>>>>>>
>>>>>>>>>> Special thanks to Guozhang for giving great guidances and
>>>> important
>>>>>>>>>> feedbacks while making this KIP!
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Boyang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to