"scheduled.rebalance.max.delay.ms" -- "...the actual delay used by the
leader to hold off redistribution of connectors and tasks and maintain
imbalance may be less or equal to this value."   How is the actual delay
determined and specified in the AssignmentFlatBuffer? I see this defaults
to a max of 5 minutes -- how will a 5 minute delay for rebalancing affect
current users who don't explicitly set this config?

If both this KIP and "Static Membership" KIP are merged, is there a case
where any user would use static membership over incremental rebalancing
(which has less configuration/user-input)? If yes, can you elaborate why a
user would use both side-by-side, or if there is situation where one is an
obvious "best" choice? I raise the concern with usability/config-sprawl in
mind, that if this KIP deprecates a config property, we should note that.
explicitly.

On Tue, Jan 22, 2019 at 6:26 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Randall, thanks for you comments! Replying inline below:
>
>
> On Sat, Jan 19, 2019 at 4:26 PM Randall Hauch <rha...@gmail.com> wrote:
>
> > Thanks for all this work, Konstantine.
> >
> > I have a question about when a member leaves. Here's the partial
> scenario,
> > repeated from the KIP:
> >
> >
> > Initial group and assignment: W1([AC0, AT1]), W2([AT2, BC0]), W3([BT1])
> > Config topic contains: AC0, AT1, AT2, BC0, BT1
> > W2 leaves
> > Rebalance is triggered
> > W1 joins with assignment: [AC0, AT1]
> > W3 joins with assignment: [BT1]
> > W1 becomes leader
> > W1 computes and sends assignments:
> > W1(delay: d, assigned: [AC0, AT1], revoked: [])
> > W3(delay: d, assigned: [BT1], revoked: [])
> > After delay d:
> > W1 joins with assignment: [AC0, AT1]
> > W3 joins with assignment: [BT1]
> > Rebalance is triggered
> > ...
> >
> > How does the leader/group know that all of the still-alive members have
> > joined and that it's safe to proceed by triggering the rebalance? Is it
> > because this time is capped by the heartbeat interval, or is it because
> the
> > leader sees a join from all of the workers to which it sent assignments?
> >
> >
> A rebalance is triggered by the broker coordinator after a new worker joins
> or a current worker leaves the group. The other members are notified via
> heartbeats.
> What the KIP suggests is that after the initial rebalance is triggered, the
> leader will choose an assignment and possibly a delay. After that delay,
> the workers will attempt another rebalance to resolve any potential
> imbalances left pending during the previous round.
>
> Also, would it be helpful for the description of the "
> > scheduled.rebalance.max.delay.ms" property denote how well this might
> > tolerate workers that leave and rejoin? Without that context it might be
> > difficult for users to know what the value really means.
> >
>
> That is a good point. The description would appreciate further explanation.
> Here's a suggestion I'm adding to the KIP.
>
> "This is a delay that the leader may set to tolerate departures of workers
> from the group by allowing a transient imbalance connector and task
> assignments. During this delay a worker has the opportunity to return to
> the group and get reassigned the same or similar amount of work as before.
> This property corresponds to the maximum delay that the leader may set in a
> single assignment. The actual delay used by the leader to hold off
> redistribution of connectors and tasks and maintain imbalance may be less
> or equal to this value."
>
>
>
> > The KIP does a nice job showing how this change will better handle
> > evolution of the embedded protocol for Connect, especially with the
> > flatbuffers. How might the values of the "connect.protocol" property
> evolve
> > with those potential changes? Do the current literals lock us in more
> than
> > we'd like?
> >
> >
> With respect to naming here, I'm definitely open to suggestions. I chose
> more descriptive, enum-like names for the options of the 'connect.protocol'
> property that correspond to the current and proposed status. I anticipate
> in the future, when we introduce policies that will require a different
> value here, that we should be able to find appropriate names to map to the
> additional values.
>
> Finally, it's good to see the "Compatibility, Deprecation, and Migration
> > Plan" section discuss a plan for deprecating the current (v0) embedded
> > protocol in:
> >
> > "Connect protocol version 0 will be marked deprecated in the next major
> > release of Apache Kafka (currently 3.0.0). After adding a deprecation
> > notice on this release, support of version 0 of the Connect protocol will
> > be removed on the subsequent major release of Apache Kafka (currently
> > 4.0.0)."
> >
> >
> > One concern I have with this wording is that it leaves no room for
> proving
> > and validating the new cooperative protocol in real world use. Do we need
> > to instead suggest that the v0 protocol will be deprecated in a future
> > release after the cooperative protocol has been proven at least as good
> as
> > the V0 protocol, and removed in the first major release after that?
> >
> >
> My impression is that accuracy in deprecation plans is appreciated in KIPs.
> However, I definitely don't consider this suggestions set in stone, and I'm
> happy to adjust or keep the plan open if we think this is preferrable. Just
> to note here, that my initial suggestion is for deprecation to be triggered
> in the next major release, and to discontinue support of the old format in
> the major release after the next one. Again, I don't have strong objections
> about extending the plan even further.
>
> Thanks for the comments!
> Best,
> Konstantine
>
> Once again, very nice work, Konstantine.
> >
> > Best regards,
> >
> > Randall
> >
> >
> > On Fri, Jan 18, 2019 at 2:00 PM Boyang Chen <bche...@outlook.com> wrote:
> >
> > > Thanks a lot for the detailed explanation here Konstantine! I strongly
> > > agree that a rolling start of
> > > Kafka broker is not the optimal solution when we have an alternative to
> > > just upgrade the client. Also
> > > I fully understood your explanation on task shuffle minimum impact in
> the
> > > workers scenario, because
> > > the local storage usage is very limited.
> > >
> > > Focusing on the current KIP, a few more suggestions are:
> > >
> > > 1. I copy-pasted partial scenario on the Leader bounces  section
> > > d' is the remaining delay
> > > W1, which is the leader, leaves
> > > Rebalance is triggered
> > > W2 joins with assignment: []
> > > W3 joins with assignment: [BT1]
> > > W3 becomes leader.
> > > There's an active delay in progress.
> > > W3 computes and sends assignments:
> > > W2(delay: d'', assigned: [], revoked: [])
> > > W3(delay: d'', assigned: [BT1, AT1], revoked: [])
> > > after we start d' round of delayed rebalance. Why does W3 send
> assignment
> > > [BT1, AT1] instead of just [BT1] here? I guess we won't do the
> > > actual rebalance until the original scheduled delay d is reached right?
> > >
> > > 2. we are basically relying on the leader subscription to persist the
> > > group assignment across the generation and leader rejoin to trigger
> > > necessary rebalance. This assumption could potentially be broken with
> > > future upgrades of
> > > broker as we are discussing
> > > https://issues.apache.org/jira/browse/KAFKA-7728. This JIRA will be
> > > converted to a ready KIP by Mayuresh pretty soon,
> > > and our goal here is to avoid unnecessary rebalance due to leader
> bounces
> > > by specifying a field called JoinReason for broker to interpret. With
> > that
> > > change in mind, I think it's worth mentioning this potential dependency
> > > within KIP-415 so that we don't forget to have corresponding change to
> > adapt
> > > to 7728 broker upgrade in case JoinReason change happens before
> KIP-415.
> > > Am I clear on the situation explanation?
> > >
> > > 3. cooperative cmeans -> means that only Incremental Cooperative
> Connect
> > > protocol is enabled (version 1 or higher).
> > >
> > > 4. For the compatibility change, I'm wondering whether we could just
> use
> > 2
> > > connect protocols instead of 3. Because the user knows when all the
> > workers
> > > all upgraded to version 1, we could just use `compatible` for the first
> > > rolling bounce
> > > and 'cooperative' for the second bounce. Could you explain a bit why we
> > > need to start from `eager` stage?
> > >
> > > cc Mayuresh on this thread.
> > >
> > > Thanks,
> > > Boyang
> > >
> > > ________________________________
> > > From: Konstantine Karantasis <konstant...@confluent.io>
> > > Sent: Friday, January 18, 2019 8:32 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing in
> > > Kafka Connect
> > >
> > > Hi Stanislav and Boyang. Thanks for your comments.
> > >
> > > You are both asking how KIP-345 affects this KIP, so, first, I'll
> address
> > > this point.
> > >
> > > KIP-345 has recently introduced an option that will allow Kafka
> consumer
> > > applications to avoid rebalances due to the departure and subsequently
> > > return of a member in the group. That way KIP-345 offers a remedy for
> the
> > > cases of rolling bounces and replacement of nodes due to failures that
> > can
> > > happen quickly.
> > >
> > > Without ruling out that policies of Incremental Cooperative Rebalancing
> > may
> > > use static membership eventually in order to better address such use
> > cases,
> > > next I'll mention a few reasons why I believe KIP-415, which is the
> > > proposal of Incremental Cooperative Rebalancing in Connect, should
> > proceed
> > > independently at first:
> > >
> > >   * KIP-345 requires an upgrade to both Kafka brokers and Connect
> > workers.
> > > This requirement is very strict for a big group of Connect users that
> > > anticipate a solution to the stop-the-world effect in Connect in order
> to
> > > grow their Connect clusters, but can not afford to also have to upgrade
> > > their Kafka brokers in order to enjoy the suggested improvements.
> > >   * Connect clusters are traditionally locally stateless and
> lightweight,
> > > in the sense that they don't store state outside Kafka and that this
> > state
> > > is easy to process during startup. Overall, based on their overall
> > > configuration and deployment requirements, Connect Workers very
> suitable
> > to
> > > run in containers. With respect to the resources that Connect Workers
> are
> > > rebalancing, connectors and tasks are (and honestly should be) easy to
> > spin
> > > and redistribute in a Connect cluster. This is true because connectors
> > > depend on Kafka and the systems they connect in order to save their
> > > progress. They don't use the Workers' local instances. Given this
> > reality,
> > > the configuration of a unique id, per KIP-345's requirement, doesn't
> seem
> > > necessary for Connect to introduce yet. The upgrade path is made even
> > > easier without having to define unique ids and in practice the
> heuristics
> > > of Incremental Cooperative Rebalancing have the potential to cover
> > > (immediately or eventually) most of the scenarios that make rebalancing
> > and
> > > stop-the-world problematic in Connect today.
> > >   * Static membership has not been merged yet. Given that KIP-415
> > addresses
> > > also scale-up and scale-down scenarios and the important side-effect
> that
> > > the submission of a new connector has to other connectors in the
> worker's
> > > group, it seems to me that introducing an interdependency between the
> two
> > > proposals is not necessary. Again, this doesn't prevent reconsidering
> > > integration in the future.
> > >   * Finally, it's not immediately obvious to me that integration
> between
> > > the two proposals also means significantly simpler implementation in
> > > Connect. That's because Connect Workers will have to handle a delay one
> > way
> > > or the other. Plus, the group management and resource assignment code
> is
> > > mostly separate between Connect and the Consumer.
> > >
> > > With respect to your other comments, Stanislav, glad you found the
> > examples
> > > easy to read. I'll change the KIP to show who's leader at the beginning
> > as
> > > well.
> > > Boyang, I'll add a paragraph to highlight why local state is not the
> most
> > > pressing issue in Connect.
> > >
> > > Thank you both for your initial comments.
> > > Konstantine
> > >
> > >
> > > On Mon, Jan 14, 2019 at 9:24 AM Boyang Chen <bche...@outlook.com>
> wrote:
> > >
> > > > Hey Konstantine,
> > > >
> > > > great work for making this happen! Incremental rebalancing is super
> > > > important for avoiding unnecessary resource shuffle and improving the
> > > > overall Connect framework stability.
> > > >
> > > > After the first pass, two questions across my mind are:
> > > >
> > > >   1.  For my understanding, the general rebalancing case could be
> > covered
> > > > by configuring the workers as static members, so that we don't need
> to
> > > > worry about worker temporarily leaving group case. Basically KIP-345
> > > could
> > > > help with avoiding unexpected rebalances during cluster rolling
> bounce
> > > > which I feel the same way as Stanislav that parts of 415 logic could
> be
> > > > simplified. It would be great if we could look at these two
> initiatives
> > > > holistically to help reduce the common workload.
> > > >   2.  Since I never used Connect before, I do hope you could
> enlighten
> > me
> > > > on the potential effort involved in task transfer between workers.
> The
> > > > reason I ask is to estimate how much burden will we introduce by
> > > starting a
> > > > task on the brand new worker? Is there any local state to be
> replayed?
> > It
> > > > would be good to also provide this background in the KIP motivation
> so
> > > that
> > > > people could understand better of the symptom and build constructive
> > > > feedbacks.
> > > >
> > > > Thanks a lot!
> > > >
> > > > Boyang
> > > > ________________________________
> > > > From: Stanislav Kozlovski <stanis...@confluent.io>
> > > > Sent: Monday, January 14, 2019 3:15 PM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-415: Incremental Cooperative Rebalancing
> in
> > > > Kafka Connect
> > > >
> > > > Hey Konstantine,
> > > >
> > > > This is a very exciting and fundamental-improving KIP, thanks a lot
> for
> > > > working on it!
> > > >
> > > > Have you seen KIP-345
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-345>? I was
> > > > wondering whether Connect would support the static group membership -
> > > > potentially limiting the need to handle "node bounce" cases through a
> > > > rebalance (even though there wouldn't be downtime). I find it is
> > somewhat
> > > > related to the `scheduled.rebalance.max.delay.ms` config described
> in
> > > > KIP-415. The main difference I think is that rebalance delay in
> KIP-345
> > > is
> > > > configurable through `session.timeout.ms` which is tied to the
> > liveness
> > > > heartbeat, whereas here we have a separate config.
> > > >
> > > > The original design document suggested
> > > > >  Assignment response includes usual assignment information. Start
> > > > processing any new partitions. (Since we expect sticky assignment, we
> > > could
> > > > also optimize this and omit the assignment when it is just repeating
> a
> > > > previous assignment)
> > > > Have we decided on whether we would make use of the optimization as
> to
> > > not
> > > > send the assignment that the worker already knows about?
> > > >
> > > > I enjoyed reading the rebalancing examples. As a small readability
> > > > improvement, could I suggest we clarify which Worker (W1,W2,W3) is
> the
> > > > leader in the "Initial group and assignment" part? For example, in
> the
> > > > `Leader bounces` I was left thinking whether the leaving W2 was the
> > > initial
> > > > leader or not.
> > > >
> > > > Thanks,
> > > > Stanislav
> > > >
> > > > On Sat, Jan 12, 2019 at 1:44 AM Konstantine Karantasis <
> > > > konstant...@confluent.io> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I just published KIP-415: Incremental Cooperative Rebalancing in
> > Kafka
> > > > > Connect
> > > > > on the wiki here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > > > >
> > > > > This is the first KIP to suggest an implementation of incremental
> and
> > > > > cooperative rebalancing in the context of Kafka Connect. It aims to
> > > > provide
> > > > > an adequate solution to the stop-the-world effect that occurs in a
> > > > Connect
> > > > > cluster whenever a new connector configuration is submitted or a
> > > Connect
> > > > > Worker is added or removed from the cluster.
> > > > >
> > > > > Looking forward to your insightful feedback!
> > > > >
> > > > > Regards,
> > > > > Konstantine
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Stanislav
> > > >
> > >
> >
>

Reply via email to