Hi All, I have added another improvement to this, which is to limit the parallel leader movements. I think I'll soon (maybe late this week or early next) start a vote on this too if there are no additional feedback.
Thanks, Viktor On Mon, Apr 29, 2019 at 1:26 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com> wrote: > Hi Folks, > > I've updated the KIP with the batching which would work on both replica > and partition level. To explain it briefly: for instance if the replica > level is set to 2 and partition level is set to 3, then 2x3=6 replica > reassignment would be in progress at the same time. In case of reassignment > for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would > form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would > execute the reassignment in this order. > > Let me know what you think. > > Best, > Viktor > > On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass < > viktorsomo...@gmail.com> wrote: > >> A follow up on the batching topic to clarify my points above. >> >> Generally I think that batching should be a core feature as Colin said >> the controller should possess all information that are related. >> Also Cruise Control (or really any 3rd party admin system) might build >> upon this to give more holistic approach to balance brokers. We may cater >> them with APIs that act like building blocks to make their life easier like >> incrementalization, batching, cancellation and rollback but I think the >> more advanced we go we'll need more advanced control surface and Kafka's >> basic tooling might not be suitable for that. >> >> Best, >> Viktor >> >> >> On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <viktorsomo...@gmail.com> >> wrote: >> >>> Hey Guys, >>> >>> I'll reply to you all in this email: >>> >>> @Jun: >>> 1. yes, it'd be a good idea to add this feature, I'll write this into >>> the KIP. I was actually thinking about introducing a dynamic config called >>> reassignment.parallel.partition.count and >>> reassignment.parallel.replica.count. The first property would control how >>> many partition reassignment can we do concurrently. The second would go one >>> level in granularity and would control how many replicas do we want to move >>> for a given partition. Also one more thing that'd be useful to fix is that >>> a given list of partition -> replica list would be executed in the same >>> order (from first to last) so it's overall predictable and the user would >>> have some control over the order of reassignments should be specified as >>> the JSON is still assembled by the user. >>> 2. the /kafka/brokers/topics/{topic} znode to be specific. I'll update >>> the KIP to contain this. >>> >>> @Jason: >>> I think building this functionality into Kafka would definitely benefit >>> all the users and that CC as well as it'd simplify their software as you >>> said. As I understand the main advantage of CC and other similar softwares >>> are to give high level features for automatic load balancing. Reliability, >>> stability and predictability of the reassignment should be a core feature >>> of Kafka. I think the incrementalization feature would make it more stable. >>> I would consider cancellation too as a core feature and we can leave the >>> gate open for external tools to feed in their reassignment json as they >>> want. I was also thinking about what are the set of features we can provide >>> for Kafka but I think the more advanced we go the more need there is for an >>> administrative UI component. >>> Regarding KIP-352: Thanks for pointing this out, I didn't see this >>> although lately I was also thinking about the throttling aspect of it. >>> Would be a nice add-on to Kafka since though the above configs provide some >>> level of control, it'd be nice to put an upper cap on the bandwidth and >>> make it monitorable. >>> >>> Viktor >>> >>> On Wed, Apr 10, 2019 at 2:57 AM Jason Gustafson <ja...@confluent.io> >>> wrote: >>> >>>> Hi Colin, >>>> >>>> On a related note, what do you think about the idea of storing the >>>> > reassigning replicas in >>>> > /brokers/topics/[topic]/partitions/[partitionId]/state, rather than >>>> in the >>>> > reassignment znode? I don't think this requires a major change to the >>>> > proposal-- when the controller becomes aware that it should do a >>>> > reassignment, the controller could make the changes. This also helps >>>> keep >>>> > the reassignment znode from getting larger, which has been a problem. >>>> >>>> >>>> Yeah, I think it's a good idea to store the reassignment state at a >>>> finer >>>> level. I'm not sure the LeaderAndIsr znode is the right one though. >>>> Another >>>> option is /brokers/topics/{topic}. That is where we currently store the >>>> replica assignment. I think we basically want to represent both the >>>> current >>>> state and the desired state. This would also open the door to a cleaner >>>> way >>>> to update a reassignment while it is still in progress. >>>> >>>> -Jason >>>> >>>> >>>> >>>> >>>> On Mon, Apr 8, 2019 at 11:14 PM George Li <sql_consult...@yahoo.com >>>> .invalid> >>>> wrote: >>>> >>>> > Hi Colin / Jason, >>>> > >>>> > Reassignment should really be doing a batches. I am not too worried >>>> about >>>> > reassignment znode getting larger. In a real production >>>> environment, too >>>> > many concurrent reassignment and too frequent submission of >>>> reassignments >>>> > seemed to cause latency spikes of kafka cluster. So >>>> > batching/staggering/throttling of submitting reassignments is >>>> recommended. >>>> > >>>> > In KIP-236, The "originalReplicas" are only kept for the current >>>> > reassigning partitions (small #), and kept in memory of the controller >>>> > context partitionsBeingReassigned as well as in the znode >>>> > /admin/reassign_partitions, I think below "setting in the RPC like >>>> null = >>>> > no replicas are reassigning" is a good idea. >>>> > >>>> > There seems to be some issues with the Mail archive server of this >>>> mailing >>>> > list? I didn't receive email after April 7th, and the archive for >>>> April >>>> > 2019 has only 50 messages ( >>>> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201904.mbox/thread) >>>> ? >>>> > >>>> > Thanks, >>>> > George >>>> > >>>> > on, 08 Apr 2019 17:54:48 GMT Colin McCabe wrote: >>>> > >>>> > Yeah, I think adding this information to LeaderAndIsr makes sense. >>>> It >>>> > would be better to track >>>> > "reassigningReplicas" than "originalReplicas", I think. Tracking >>>> > "originalReplicas" is going >>>> > to involve sending a lot more data, since most replicas in the system >>>> are >>>> > not reassigning >>>> > at any given point. Or we would need a hack in the RPC like null = no >>>> > replicas are reassigning. >>>> > >>>> > On a related note, what do you think about the idea of storing the >>>> > reassigning replicas in >>>> > /brokers/topics/[topic]/partitions/[partitionId]/state, rather than >>>> in >>>> > the reassignment znode? >>>> > I don't think this requires a major change to the proposal-- when the >>>> > controller becomes >>>> > aware that it should do a reassignment, the controller could make the >>>> > changes. This also >>>> > helps keep the reassignment znode from getting larger, which has been >>>> a >>>> > problem. >>>> > >>>> > best, >>>> > Colin >>>> > >>>> > >>>> > On Mon, Apr 8, 2019, at 09:29, Jason Gustafson wrote: >>>> > > Hey George, >>>> > > >>>> > > For the URP during a reassignment, if the "original_replicas" is >>>> kept >>>> > for >>>> > > > the current pending reassignment. I think it will be very easy to >>>> > compare >>>> > > > that with the topic/partition's ISR. If all "original_replicas" >>>> are in >>>> > > > ISR, then URP should be 0 for that topic/partition. >>>> > > >>>> > > >>>> > > Yeah, that makes sense. But I guess we would need >>>> "original_replicas" to >>>> > be >>>> > > propagated to partition leaders in the LeaderAndIsr request since >>>> leaders >>>> > > are the ones that are computing URPs. That is basically what >>>> KIP-352 had >>>> > > proposed, but we also need the changes to the reassignment path. >>>> Perhaps >>>> > it >>>> > > makes more sense to address this problem in KIP-236 since that is >>>> where >>>> > you >>>> > > have already introduced "original_replicas"? I'm also happy to do >>>> KIP-352 >>>> > > as a follow-up to KIP-236. >>>> > > >>>> > > Best, >>>> > > Jason >>>> > > >>>> > > >>>> > > On Sun, Apr 7, 2019 at 5:09 PM Ismael Juma <isma...@gmail.com> >>>> wrote: >>>> > > >>>> > > > Good discussion about where we should do batching. I think if >>>> there is >>>> > a >>>> > > > clear great way to batch, then it makes a lot of sense to just do >>>> it >>>> > once. >>>> > > > However, if we think there is scope for experimenting with >>>> different >>>> > > > approaches, then an API that tools can use makes a lot of sense. >>>> They >>>> > can >>>> > > > experiment and innovate. Eventually, we can integrate something >>>> into >>>> > Kafka >>>> > > > if it makes sense. >>>> > > > >>>> > > > Ismael >>>> > > > >>>> > > > On Sun, Apr 7, 2019, 11:03 PM Colin McCabe <cmcc...@apache.org> >>>> wrote: >>>> > > > >>>> > > > > Hi George, >>>> > > > > >>>> > > > > As Jason was saying, it seems like there are two directions we >>>> could >>>> > go >>>> > > > > here: an external system handling batching, and the controller >>>> > handling >>>> > > > > batching. I think the controller handling batching would be >>>> better, >>>> > > > since >>>> > > > > the controller has more information about the state of the >>>> system. >>>> > If >>>> > > > the >>>> > > > > controller handles batching, then the controller could also >>>> handle >>>> > things >>>> > > > > like setting up replication quotas for individual partitions. >>>> The >>>> > > > > controller could do things like throttle replication down if the >>>> > cluster >>>> > > > > was having problems. >>>> > > > > >>>> > > > > We kind of need to figure out which way we're going to go on >>>> this one >>>> > > > > before we set up big new APIs, I think. If we want an external >>>> > system to >>>> > > > > handle batching, then we can keep the idea that there is only >>>> one >>>> > > > > reassignment in progress at once. If we want the controller to >>>> > handle >>>> > > > > batching, we will need to get away from that idea. Instead, we >>>> > should >>>> > > > just >>>> > > > > have a bunch of "ideal assignments" that we tell the controller >>>> > about, >>>> > > > and >>>> > > > > let it decide how to do the batching. These ideal assignments >>>> could >>>> > > > change >>>> > > > > continuously over time, so from the admin's point of view, there >>>> > would be >>>> > > > > no start/stop/cancel, but just individual partition >>>> reassignments >>>> > that we >>>> > > > > submit, perhaps over a long period of time. And then >>>> cancellation >>>> > might >>>> > > > > just mean cancelling just that individual partition >>>> reassignment, >>>> > not all >>>> > > > > partition reassignments. >>>> > > > > >>>> > > > > best, >>>> > > > > Colin >>>> > > > > >>>> > > > > On Fri, Apr 5, 2019, at 19:34, George Li wrote: >>>> > > > > > Hi Jason / Viktor, >>>> > > > > > >>>> > > > > > For the URP during a reassignment, if the >>>> "original_replicas" is >>>> > kept >>>> > > > > > for the current pending reassignment. I think it will be very >>>> easy >>>> > to >>>> > > > > > compare that with the topic/partition's ISR. If all >>>> > > > > > "original_replicas" are in ISR, then URP should be 0 for that >>>> > > > > > topic/partition. >>>> > > > > > >>>> > > > > > It would be also nice to separate the metrics MaxLag/TotalLag >>>> for >>>> > > > > > Reassignments. I think that will also require >>>> "original_replicas" >>>> > (the >>>> > > > > > topic/partition's replicas just before reassignment when the >>>> AR >>>> > > > > > (Assigned Replicas) is set to Set(original_replicas) + >>>> > > > > > Set(new_replicas_in_reassign_partitions) ). >>>> > > > > > >>>> > > > > > Thanks, >>>> > > > > > George >>>> > > > > > >>>> > > > > > On Friday, April 5, 2019, 6:29:55 PM PDT, Jason Gustafson >>>> > > > > > <ja...@confluent.io> wrote: >>>> > > > > > >>>> > > > > > Hi Viktor, >>>> > > > > > >>>> > > > > > Thanks for writing this up. As far as questions about overlap >>>> with >>>> > > > > KIP-236, >>>> > > > > > I agree it seems mostly orthogonal. I think KIP-236 may have >>>> had a >>>> > > > larger >>>> > > > > > initial scope, but now it focuses on cancellation and >>>> batching is >>>> > left >>>> > > > > for >>>> > > > > > future work. >>>> > > > > > >>>> > > > > > With that said, I think we may not actually need a KIP for the >>>> > current >>>> > > > > > proposal since it doesn't change any APIs. To make it more >>>> > generally >>>> > > > > > useful, however, it would be nice to handle batching at the >>>> > partition >>>> > > > > level >>>> > > > > > as well as Jun suggests. The basic question is at what level >>>> > should the >>>> > > > > > batching be determined. You could rely on external processes >>>> (e.g. >>>> > > > cruise >>>> > > > > > control) or it could be built into the controller. There are >>>> > tradeoffs >>>> > > > > > either way, but I think it simplifies such tools if it is >>>> handled >>>> > > > > > internally. Then it would be much safer to submit a larger >>>> > reassignment >>>> > > > > > even just using the simple tools that come with Kafka. >>>> > > > > > >>>> > > > > > By the way, since you are looking into some of the >>>> reassignment >>>> > logic, >>>> > > > > > another problem that we might want to address is the >>>> misleading >>>> > way we >>>> > > > > > report URPs during a reassignment. I had a naive proposal for >>>> this >>>> > > > > > previously, but it didn't really work >>>> > > > > > >>>> > > > > >>>> > > > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment >>>> > > > > . >>>> > > > > > Potentially fixing that could fall under this work as well if >>>> you >>>> > think >>>> > > > > > it >>>> > > > > > makes sense. >>>> > > > > > >>>> > > > > > Best, >>>> > > > > > Jason >>>> > > > > > >>>> > > > > > On Thu, Apr 4, 2019 at 4:49 PM Jun Rao <j...@confluent.io> >>>> wrote: >>>> > > > > > >>>> > > > > > > Hi, Viktor, >>>> > > > > > > >>>> > > > > > > Thanks for the KIP. A couple of comments below. >>>> > > > > > > >>>> > > > > > > 1. Another potential thing to do reassignment incrementally >>>> is to >>>> > > > move >>>> > > > > a >>>> > > > > > > batch of partitions at a time, instead of all partitions. >>>> This >>>> > may >>>> > > > > lead to >>>> > > > > > > less data replication since by the time the first batch of >>>> > partitions >>>> > > > > have >>>> > > > > > > been completely moved, some data of the next batch may have >>>> been >>>> > > > > deleted >>>> > > > > > > due to retention and doesn't need to be replicated. >>>> > > > > > > >>>> > > > > > > 2. "Update CR in Zookeeper with TR for the given partition". >>>> > Which >>>> > ZK >>>> > > > > path >>>> > > > > > > is this for? >>>> > > > > > > >>>> > > > > > > Jun >>>> > > > > > > >>>> > > > > > > On Sat, Feb 23, 2019 at 2:12 AM Viktor Somogyi-Vass < >>>> > > > > > > viktorsomo...@gmail.com> >>>> > > > > > > wrote: >>>> > > > > > > >>>> > > > > > > > Hi Harsha, >>>> > > > > > > > >>>> > > > > > > > As far as I understand KIP-236 it's about enabling >>>> reassignment >>>> > > > > > > > cancellation and as a future plan providing a queue of >>>> replica >>>> > > > > > > reassignment >>>> > > > > > > > steps to allow manual reassignment chains. While I agree >>>> that >>>> > the >>>> > > > > > > > reassignment chain has a specific use case that allows >>>> fine >>>> > grain >>>> > > > > control >>>> > > > > > > > over reassignment process, My proposal on the other hand >>>> > doesn't >>>> > > > talk >>>> > > > > > > about >>>> > > > > > > > cancellation but it only provides an automatic way to >>>> > > > incrementalize >>>> > > > > an >>>> > > > > > > > arbitrary reassignment which I think fits the general use >>>> case >>>> > > > where >>>> > > > > > > users >>>> > > > > > > > don't want that level of control but still would like a >>>> > balanced >>>> > > > way >>>> > > > > of >>>> > > > > > > > reassignments. Therefore I think it's still relevant as an >>>> > > > > improvement of >>>> > > > > > > > the current algorithm. >>>> > > > > > > > Nevertheless I'm happy to add my ideas to KIP-236 as I >>>> think >>>> > it >>>> > > > > would be >>>> > > > > > > a >>>> > > > > > > > great improvement to Kafka. >>>> > > > > > > > >>>> > > > > > > > Cheers, >>>> > > > > > > > Viktor >>>> > > > > > > > >>>> > > > > > > > On Fri, Feb 22, 2019 at 5:05 PM Harsha <ka...@harsha.io> >>>> > wrote: >>>> > > > > > > > >>>> > > > > > > > > Hi Viktor, >>>> > > > > > > > > There is already KIP-236 for the same feature >>>> > and >>>> > > > George >>>> > > > > > > made >>>> > > > > > > > > a PR for this as well. >>>> > > > > > > > > Lets consolidate these two discussions. If you have any >>>> > cases >>>> > > > that >>>> > > > > are >>>> > > > > > > > not >>>> > > > > > > > > being solved by KIP-236 can you please mention them in >>>> > that >>>> > > > > thread. We >>>> > > > > > > > can >>>> > > > > > > > > address as part of KIP-236. >>>> > > > > > > > > >>>> > > > > > > > > Thanks, >>>> > > > > > > > > Harsha >>>> > > > > > > > > >>>> > > > > > > > > On Fri, Feb 22, 2019, at 5:44 AM, Viktor Somogyi-Vass >>>> wrote: >>>> > > > > > > > > > Hi Folks, >>>> > > > > > > > > > >>>> > > > > > > > > > I've created a KIP about an improvement of the >>>> reassignment >>>> > > > > algorithm >>>> > > > > > > > we >>>> > > > > > > > > > have. It aims to enable partition-wise incremental >>>> > > > reassignment. >>>> > > > > The >>>> > > > > > > > > > motivation for this is to avoid excess load that the >>>> > current >>>> > > > > > > > replication >>>> > > > > > > > > > algorithm implicitly carries as in that case there >>>> > are points >>>> > > > in >>>> > > > > the >>>> > > > > > > > > > algorithm where both the new and old replica set could >>>> > be >>>> > > > online >>>> > > > > and >>>> > > > > > > > > > replicating which puts double (or almost double) >>>> pressure >>>> > on >>>> > > > the >>>> > > > > > > > brokers >>>> > > > > > > > > > which could cause problems. >>>> > > > > > > > > > Instead my proposal would slice this up into several >>>> > steps >>>> > > > where >>>> > > > > each >>>> > > > > > > > > step >>>> > > > > > > > > > is calculated based on the final target replicas and >>>> > the >>>> > > > current >>>> > > > > > > > replica >>>> > > > > > > > > > assignment taking into account scenarios where brokers >>>> > could be >>>> > > > > > > offline >>>> > > > > > > > > and >>>> > > > > > > > > > when there are not enough replicas to fulfil the >>>> > > > > min.insync.replica >>>> > > > > > > > > > requirement. >>>> > > > > > > > > > >>>> > > > > > > > > > The link to the KIP: >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > >>>> > > > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment >>>> > > > > > > > > > >>>> > > > > > > > > > I'd be happy to receive any feedback. >>>> > > > > > > > > > >>>> > > > > > > > > > An important note is that this KIP and another one, >>>> > KIP-236 >>>> > > > that >>>> > > > > is >>>> > > > > > > > > > about >>>> > > > > > > > > > interruptible reassignment ( >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > >>>> > > > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment >>>> > > > > > > > > ) >>>> > > > > > > > > > should be compatible. >>>> > > > > > > > > > >>>> > > > > > > > > > Thanks, >>>> > > > > > > > > > Viktor >>>> > > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> >>>