Sure. Glad you like this KIP. (KIP-258 will be more difficult... It seems that is was a good decision into two).
-Matthias On 3/22/18 11:35 AM, James Cheng wrote: > > >> On Mar 21, 2018, at 11:45 PM, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Yes, it only affects the metadata. KIP-268 targets metadata upgrade >> without store upgrade. >> >> We can discuss store upgrade further in KIP-258: I think in general, the >> upgrade/downgrade behavior might be an issue for upgrading stores. >> However, this upgrade/downgrade can only happen when upgrading from 1.2 >> to a future version. Thus, it won't affect an upgrade to 1.2. >> >> For an upgrade to 1.2, we introduce the "upgrade.from" parameter >> (because we don't have "version probing" for 1.1 yet) and this ensures >> that upgrading cannot happen "too early", and no downgrade can happen >> either for this case. >> >> Let me know what you think. >> > > I think yes, we can discuss upgrade/downgrade issues (to versions after 1.2) > in the other KIP (KIP-258). > > However, this KIP-268 looks fine. It gives us the mechanism to properly > detect and automatically upgrade/downgrade the topology and allows the > new/old code to co-exist within a topology, which is something we didn't have > before. > > KIP-268 looks good to me. > > Thanks for all the answers to my questions. > > -James > >> >> -Matthias >> >> On 3/21/18 11:16 PM, James Cheng wrote: >>> >>> >>> >>>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>> Thanks for following up James. >>>> >>>>> Is this the procedure that happens during every rebalance? The reason I >>>>> ask is that this step: >>>>>>>> As long as the leader (before or after upgrade) receives at least >>>> one old version X Subscription it always sends version Assignment X back >>>> (the encoded supported version is X before the leader is upgrade and Y >>>> after the leader is upgraded). >>>> >>>> Yes, that would be the consequence. >>>> >>>>> This implies that the leader receives all Subscriptions before sending >>>>> back any responses. Is that what actually happens? Is it possible that it >>>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, >>>>> and then later receive a Subscription X? What happens in that case? Would >>>>> that Subscription X then trigger another rebalance, and the whole thing >>>>> starts again? >>>> >>>> That sounds correct. A 'delayed' Subscription could always happen -- >>>> even before KIP-268 -- and would trigger a new rebalance. With this >>>> regard, the behavior does not change. The difference is, that we would >>>> automatically downgrade the Assignment from Y to X again -- but the >>>> application would not fail (as it would before the KIP). >>>> >>>> Do you see an issue with this behavior. The idea of the design is to >>>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are >>>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first >>>> upgrade from X to Y and downgrade from Y to X in the second rebalance >>>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance >>>> would upgrade to Y again. >>>> >>> >>> Sounds good. >>> >>> >>>> Thus, as long as not all instances are on the newest version, >>>> upgrades/donwgrades of the exchanged rebalance metadata could happen >>>> multiple times. However, this should not be an issue from my understanding. >>> >>> About “this should not be an issue”: this upgrade/downgrade is just about >>> the rebalance metadata, right? Are there other associated things that will >>> also have to upgrade/downgrade in sync with the rebalance metadata? For >>> example, the idea for this KIP originally came up during the discussion >>> about adding timestamps to RockDB state stores, which required updating the >>> on-disk schema. In the case of an updated RocksDB state store but with a >>> downgraded rebalance metadata... that should work, right? Because we still >>> have updated code (which understands the on-disk format) but that it simply >>> gets its partition assignments via the downgraded rebalance metadata? >>> >>> Thanks, >>> -James >>> >>> Sent from my iPhone >>> >>>> Let us know what you think about it. >>>> >>>> >>>> -Matthias >>>> >>>> >>>>> On 3/20/18 11:10 PM, James Cheng wrote: >>>>> Sorry, I see that the VOTE started already, but I have a late question on >>>>> this KIP. >>>>> >>>>> In the "version probing" protocol: >>>>>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2): >>>>>> On startup/rolling-bounce, an instance does not know what version the >>>>>> leader understands and (optimistically) sends an Subscription with the >>>>>> latest version Y >>>>>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the >>>>>> corresponding instance that sent the newer Subscription it does not >>>>>> understand. The Assignment metadata only encodes both version numbers >>>>>> (used-version == supported-version) as leader's supported-version X. >>>>>> For all other instances the leader sends a regular Assignment in version >>>>>> X back. >>>>>> If an upgrade follower sends new version number Y Subscription and >>>>>> receives version X Assignment with "supported-version = X", it can >>>>>> downgrade to X (in-memory flag) and resends a new Subscription with old >>>>>> version X to retry joining the group. To force an immediate second >>>>>> rebalance, the follower does an "unsubscribe()/subscribe()/poll()" >>>>>> sequence. >>>>>> As long as the leader (before or after upgrade) receives at least one >>>>>> old version X Subscription it always sends version Assignment X back >>>>>> (the encoded supported version is X before the leader is upgrade and Y >>>>>> after the leader is upgraded). >>>>>> If an upgraded instance receives an Assigment it always checks the >>>>>> leaders supported-version and update its downgraded "used-version" if >>>>>> possible >>>>> >>>>> Is this the procedure that happens during every rebalance? The reason I >>>>> ask is that this step: >>>>>>> As long as the leader (before or after upgrade) receives at least one >>>>>>> old version X Subscription it always sends version Assignment X back >>>>>>> (the encoded supported version is X before the leader is upgrade and Y >>>>>>> after the leader is upgraded). >>>>> >>>>> This implies that the leader receives all Subscriptions before sending >>>>> back any responses. Is that what actually happens? Is it possible that it >>>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, >>>>> and then later receive a Subscription X? What happens in that case? Would >>>>> that Subscription X then trigger another rebalance, and the whole thing >>>>> starts again? >>>>> >>>>> Thanks, >>>>> -James >>>>> >>>>>> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax <matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>> Guozhang, >>>>>> >>>>>> thanks for your comments. >>>>>> >>>>>> 2: I think my main concern is, that 1.2 would be "special" release that >>>>>> everybody need to use to upgrade. As an alternative, we could say that >>>>>> we add the config in 1.2 and keep it for 2 additional releases (1.3 and >>>>>> 1.4) but remove it in 1.5. This gives users more flexibility and does >>>>>> force not force user to upgrade to a specific version but also allows us >>>>>> to not carry the tech debt forever. WDYT about this? If users upgrade on >>>>>> an regular basis, this approach could avoid a forces update with high >>>>>> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some >>>>>> point. Thus, only if users don't upgrade for a very long time, they are >>>>>> forces to do 2 upgrades with an intermediate version. >>>>>> >>>>>> 4. Updated the KIP to remove the ".x" suffix >>>>>> >>>>>> 5. Updated the KIP accordingly. >>>>>> >>>>>> -Matthias >>>>>> >>>>>>> On 3/19/18 10:33 AM, Guozhang Wang wrote: >>>>>>> Yup :) >>>>>>> >>>>>>>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>> >>>>>>>> bq. some snippet like ProduceRequest / ProduceRequest >>>>>>>> >>>>>>>> Did you mean ProduceRequest / Response ? >>>>>>>> >>>>>>>> Cheers >>>>>>>> >>>>>>>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang <wangg...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Matthias, >>>>>>>>> >>>>>>>>> About 2: yeah I guess this is a subjective preference. My main concern >>>>>>>>> about keeping the config / handling code beyond 1.2 release is that it >>>>>>>> will >>>>>>>>> become a non-cleanable tech debt forever, as fewer and fewer users >>>>>>>>> would >>>>>>>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to >>>>>>>>> maintain this for nearly no one. On the other hand, I agree that this >>>>>>>> tech >>>>>>>>> debt is not too large. So if more people feel this is a good tradeoff >>>>>>>>> to >>>>>>>>> pay for not enforcing users from older versions to upgrade twice I'm >>>>>>>> happen >>>>>>>>> to change my opinion. >>>>>>>>> >>>>>>>>> A few more minor comments: >>>>>>>>> >>>>>>>>> 4. For the values of "upgrade.from", could we simply to only >>>>>>>>> major.minor? >>>>>>>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility >>>>>>>>> behavior in bug fix releases we would not need to specify a bug-fix >>>>>>>> version >>>>>>>>> to distinguish ever. >>>>>>>>> >>>>>>>>> 5. Could you also present the encoding format in subscription / >>>>>>>> assignment >>>>>>>>> metadata bytes in version 2, and in future versions (i.e. which first >>>>>>>> bytes >>>>>>>>> would be kept moving forward), for readers to better understand the >>>>>>>>> proposal? some snippet like ProduceRequest / ProduceRequest in >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging >>>>>>>>> would be very helpful. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax >>>>>>>>> <matth...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for your comments. >>>>>>>>>> >>>>>>>>>> 1. Because the old leader cannot decode the new Subscription it can >>>>>>>> only >>>>>>>>>> send an empty assignment back. The idea to send empty assignments to >>>>>>>> all >>>>>>>>>> members is interesting. I will try this out in an PR to see how it >>>>>>>>> behaves. >>>>>>>>>> >>>>>>>>>> 2. I don't see an issue with keeping config `upgrade.from` for future >>>>>>>>>> releases. Personally, I would prefer to not force users to do two >>>>>>>>>> upgrades if they want to go from pre-1.2 to post-1.2 version. Is >>>>>>>>>> there >>>>>>>> a >>>>>>>>>> technical argument why you want to get rid of the config? What >>>>>>>>>> disadvantages do you see keeping `upgrade.from` beyond 1.2 release? >>>>>>>>>> >>>>>>>>>> Keeping the config is just a few lines of code in `StreamsConfig` as >>>>>>>>>> well we a single `if` statement in `StreamsPartitionAssignor` to >>>>>>>>>> force >>>>>>>> a >>>>>>>>>> downgrade (cf >>>>>>>>>> https://github.com/apache/kafka/pull/4636/files#diff- >>>>>>>>>> 392371c29384e33bb09ed342e7696c68R201) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 3. I updated the KIP accordingly. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>>> On 3/15/18 3:19 PM, Guozhang Wang wrote: >>>>>>>>>>> Hello Matthias, thanks for the KIP. Here are some comments: >>>>>>>>>>> >>>>>>>>>>> 1. "For all other instances the leader sends a regular Assignment in >>>>>>>>>>> version X back." Does that mean the leader will exclude any member >>>>>>>>>>> of >>>>>>>>> the >>>>>>>>>>> group whose protocol version that it does not understand? For >>>>>>>> example, >>>>>>>>> if >>>>>>>>>>> we have A, B, C with A the leader, and B bounced with the newer >>>>>>>>> version. >>>>>>>>>> In >>>>>>>>>>> the first rebalance, A will only consider {A, C} for assignment >>>>>>>>>>> while >>>>>>>>>>> sending empty assignment to B. And then later when B downgrades will >>>>>>>> it >>>>>>>>>>> re-assign the tasks to it again? I felt this is unnecessarily >>>>>>>>> increasing >>>>>>>>>>> the num. rebalances and the total latency. Could the leader just >>>>>>>> sends >>>>>>>>>>> empty assignment to everyone, and since upon receiving the empty >>>>>>>>>> assignment >>>>>>>>>>> each thread will not create / restore any tasks and will not clean >>>>>>>>>>> up >>>>>>>>> its >>>>>>>>>>> local state (so that the prevCachedTasks are not lost in future >>>>>>>>>> rebalances) >>>>>>>>>>> and re-joins immediately, if users choose to bounce an instance once >>>>>>>> it >>>>>>>>>> is >>>>>>>>>>> in RUNNING state the total time of rolling upgrades will be reduced. >>>>>>>>>>> >>>>>>>>>>> 2. If we want to allow upgrading from 1.1- versions to any of the >>>>>>>>> future >>>>>>>>>>> versions beyond 1.2, then we'd always need to keep the special >>>>>>>> handling >>>>>>>>>>> logic for this two rolling-bounce mechanism plus a config that we >>>>>>>> would >>>>>>>>>>> never be able to deprecate; on the other hand, if the version >>>>>>>>>>> probing >>>>>>>>>>> procedure is fast, I think the extra operational cost from upgrading >>>>>>>>> from >>>>>>>>>>> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then >>>>>>>>> another >>>>>>>>>>> upgrade from 1.2 to a future version could be small. So depending on >>>>>>>>> the >>>>>>>>>>> experimental result of the upgrade latency, I'd suggest considering >>>>>>>> the >>>>>>>>>>> trade-off of the extra code/config needed maintaining for the >>>>>>>>>>> special >>>>>>>>>>> handling. >>>>>>>>>>> >>>>>>>>>>> 3. Testing plan: could you elaborate a bit more on the actual >>>>>>>>>> upgrade-paths >>>>>>>>>>> we should test? For example, I'm thinking the following: >>>>>>>>>>> >>>>>>>>>>> a. 0.10.0 -> 1.2 >>>>>>>>>>> b. 1.1 -> 1.2 >>>>>>>>>>> c. 1.2 -> 1.3 (simulated v4) >>>>>>>>>>> d. 0.10.0 -> 1.3 (simulated v4) >>>>>>>>>>> e. 1.1 -> 1.3 (simulated v4) >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax < >>>>>>>>> matth...@confluent.io >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I want to propose KIP-268 to allow rebalance metadata version >>>>>>>> upgrades >>>>>>>>>>>> in Kafka Streams: >>>>>>>>>>>> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade >>>>>>>>>>>> >>>>>>>>>>>> Looking forward to your feedback. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >> >
signature.asc
Description: OpenPGP digital signature