Sure.

Glad you like this KIP. (KIP-258 will be more difficult... It seems that
is was a good decision into two).

-Matthias


On 3/22/18 11:35 AM, James Cheng wrote:
> 
> 
>> On Mar 21, 2018, at 11:45 PM, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> Yes, it only affects the metadata. KIP-268 targets metadata upgrade
>> without store upgrade.
>>
>> We can discuss store upgrade further in KIP-258: I think in general, the
>> upgrade/downgrade behavior might be an issue for upgrading stores.
>> However, this upgrade/downgrade can only happen when upgrading from 1.2
>> to a future version. Thus, it won't affect an upgrade to 1.2.
>>
>> For an upgrade to 1.2, we introduce the "upgrade.from" parameter
>> (because we don't have "version probing" for 1.1 yet) and this ensures
>> that upgrading cannot happen "too early", and no downgrade can happen
>> either for this case.
>>
>> Let me know what you think.
>>
> 
> I think yes, we can discuss upgrade/downgrade issues (to versions after 1.2) 
> in the other KIP (KIP-258).
> 
> However, this KIP-268 looks fine. It gives us the mechanism to properly 
> detect and automatically upgrade/downgrade the topology and allows the 
> new/old code to co-exist within a topology, which is something we didn't have 
> before.
> 
> KIP-268 looks good to me.
> 
> Thanks for all the answers to my questions.
> 
> -James
> 
>>
>> -Matthias
>>
>> On 3/21/18 11:16 PM, James Cheng wrote:
>>>
>>>
>>>
>>>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax <matth...@confluent.io> 
>>>> wrote:
>>>>
>>>> Thanks for following up James.
>>>>
>>>>> Is this the procedure that happens during every rebalance? The reason I 
>>>>> ask is that this step:
>>>>>>>> As long as the leader (before or after upgrade) receives at least
>>>> one old version X Subscription it always sends version Assignment X back
>>>> (the encoded supported version is X before the leader is upgrade and Y
>>>> after the leader is upgraded).
>>>>
>>>> Yes, that would be the consequence.
>>>>
>>>>> This implies that the leader receives all Subscriptions before sending 
>>>>> back any responses. Is that what actually happens? Is it possible that it 
>>>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, 
>>>>> and then later receive a Subscription X? What happens in that case? Would 
>>>>> that Subscription X then trigger another rebalance, and the whole thing 
>>>>> starts again?
>>>>
>>>> That sounds correct. A 'delayed' Subscription could always happen --
>>>> even before KIP-268 -- and would trigger a new rebalance. With this
>>>> regard, the behavior does not change. The difference is, that we would
>>>> automatically downgrade the Assignment from Y to X again -- but the
>>>> application would not fail (as it would before the KIP).
>>>>
>>>> Do you see an issue with this behavior. The idea of the design is to
>>>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
>>>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
>>>> upgrade from X to Y and downgrade from Y to X in the second rebalance
>>>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
>>>> would upgrade to Y again.
>>>>
>>>
>>> Sounds good. 
>>>
>>>
>>>> Thus, as long as not all instances are on the newest version,
>>>> upgrades/donwgrades of the exchanged rebalance metadata could happen
>>>> multiple times. However, this should not be an issue from my understanding.
>>>
>>> About “this should not be an issue”: this upgrade/downgrade is just about 
>>> the rebalance metadata, right? Are there other associated things that will 
>>> also have to upgrade/downgrade in sync with the rebalance metadata? For 
>>> example, the idea for this KIP originally came up during the discussion 
>>> about adding timestamps to RockDB state stores, which required updating the 
>>> on-disk schema. In the case of an updated RocksDB state store but with a 
>>> downgraded rebalance metadata... that should work, right? Because we still 
>>> have updated code (which understands the on-disk format) but that it simply 
>>> gets its partition assignments via the downgraded rebalance metadata?
>>>
>>> Thanks,
>>> -James
>>>
>>> Sent from my iPhone
>>>
>>>> Let us know what you think about it.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>> On 3/20/18 11:10 PM, James Cheng wrote:
>>>>> Sorry, I see that the VOTE started already, but I have a late question on 
>>>>> this KIP.
>>>>>
>>>>> In the "version probing" protocol:
>>>>>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
>>>>>> On startup/rolling-bounce, an instance does not know what version the 
>>>>>> leader understands and (optimistically) sends an Subscription with the 
>>>>>> latest version Y
>>>>>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
>>>>>> corresponding instance that sent the newer Subscription it does not 
>>>>>> understand. The Assignment metadata only encodes both version numbers 
>>>>>> (used-version == supported-version) as leader's supported-version X.
>>>>>> For all other instances the leader sends a regular Assignment in version 
>>>>>> X back.
>>>>>> If an upgrade follower sends new version number Y Subscription and 
>>>>>> receives version X  Assignment with "supported-version = X", it can 
>>>>>> downgrade to X (in-memory flag) and resends a new Subscription with old 
>>>>>> version X to retry joining the group. To force an immediate second 
>>>>>> rebalance, the follower does an "unsubscribe()/subscribe()/poll()" 
>>>>>> sequence.
>>>>>> As long as the leader (before or after upgrade) receives at least one 
>>>>>> old version X Subscription it always sends version Assignment X back 
>>>>>> (the encoded supported version is X before the leader is upgrade and Y 
>>>>>> after the leader is upgraded).
>>>>>> If an upgraded instance receives an Assigment it always checks the 
>>>>>> leaders supported-version and update its downgraded "used-version" if 
>>>>>> possible
>>>>>
>>>>> Is this the procedure that happens during every rebalance? The reason I 
>>>>> ask is that this step:
>>>>>>> As long as the leader (before or after upgrade) receives at least one 
>>>>>>> old version X Subscription it always sends version Assignment X back 
>>>>>>> (the encoded supported version is X before the leader is upgrade and Y 
>>>>>>> after the leader is upgraded).
>>>>>
>>>>> This implies that the leader receives all Subscriptions before sending 
>>>>> back any responses. Is that what actually happens? Is it possible that it 
>>>>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, 
>>>>> and then later receive a Subscription X? What happens in that case? Would 
>>>>> that Subscription X then trigger another rebalance, and the whole thing 
>>>>> starts again?
>>>>>
>>>>> Thanks,
>>>>> -James
>>>>>
>>>>>> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax <matth...@confluent.io> 
>>>>>> wrote:
>>>>>>
>>>>>> Guozhang,
>>>>>>
>>>>>> thanks for your comments.
>>>>>>
>>>>>> 2: I think my main concern is, that 1.2 would be "special" release that
>>>>>> everybody need to use to upgrade. As an alternative, we could say that
>>>>>> we add the config in 1.2 and keep it for 2 additional releases (1.3 and
>>>>>> 1.4) but remove it in 1.5. This gives users more flexibility and does
>>>>>> force not force user to upgrade to a specific version but also allows us
>>>>>> to not carry the tech debt forever. WDYT about this? If users upgrade on
>>>>>> an regular basis, this approach could avoid a forces update with high
>>>>>> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
>>>>>> point. Thus, only if users don't upgrade for a very long time, they are
>>>>>> forces to do 2 upgrades with an intermediate version.
>>>>>>
>>>>>> 4. Updated the KIP to remove the ".x" suffix
>>>>>>
>>>>>> 5. Updated the KIP accordingly.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>> On 3/19/18 10:33 AM, Guozhang Wang wrote:
>>>>>>> Yup :)
>>>>>>>
>>>>>>>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> bq. some snippet like ProduceRequest / ProduceRequest
>>>>>>>>
>>>>>>>> Did you mean ProduceRequest / Response ?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang <wangg...@gmail.com> 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> About 2: yeah I guess this is a subjective preference. My main concern
>>>>>>>>> about keeping the config / handling code beyond 1.2 release is that it
>>>>>>>> will
>>>>>>>>> become a non-cleanable tech debt forever, as fewer and fewer users 
>>>>>>>>> would
>>>>>>>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
>>>>>>>>> maintain this for nearly no one. On the other hand, I agree that this
>>>>>>>> tech
>>>>>>>>> debt is not too large. So if more people feel this is a good tradeoff 
>>>>>>>>> to
>>>>>>>>> pay for not enforcing users from older versions to upgrade twice I'm
>>>>>>>> happen
>>>>>>>>> to change my opinion.
>>>>>>>>>
>>>>>>>>> A few more minor comments:
>>>>>>>>>
>>>>>>>>> 4. For the values of "upgrade.from", could we simply to only 
>>>>>>>>> major.minor?
>>>>>>>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
>>>>>>>>> behavior in bug fix releases we would not need to specify a bug-fix
>>>>>>>> version
>>>>>>>>> to distinguish ever.
>>>>>>>>>
>>>>>>>>> 5. Could you also present the encoding format in subscription /
>>>>>>>> assignment
>>>>>>>>> metadata bytes in version 2, and in future versions (i.e. which first
>>>>>>>> bytes
>>>>>>>>> would be kept moving forward), for readers to better understand the
>>>>>>>>> proposal? some snippet like ProduceRequest / ProduceRequest in
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>>>>>>>>> would be very helpful.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
>>>>>>>>> <matth...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>
>>>>>>>>>> 1. Because the old leader cannot decode the new Subscription it can
>>>>>>>> only
>>>>>>>>>> send an empty assignment back. The idea to send empty assignments to
>>>>>>>> all
>>>>>>>>>> members is interesting. I will try this out in an PR to see how it
>>>>>>>>> behaves.
>>>>>>>>>>
>>>>>>>>>> 2. I don't see an issue with keeping config `upgrade.from` for future
>>>>>>>>>> releases. Personally, I would prefer to not force users to do two
>>>>>>>>>> upgrades if they want to go from pre-1.2 to post-1.2 version. Is 
>>>>>>>>>> there
>>>>>>>> a
>>>>>>>>>> technical argument why you want to get rid of the config? What
>>>>>>>>>> disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
>>>>>>>>>>
>>>>>>>>>> Keeping the config is just a few lines of code in `StreamsConfig` as
>>>>>>>>>> well we a single `if` statement in `StreamsPartitionAssignor` to 
>>>>>>>>>> force
>>>>>>>> a
>>>>>>>>>> downgrade (cf
>>>>>>>>>> https://github.com/apache/kafka/pull/4636/files#diff-
>>>>>>>>>> 392371c29384e33bb09ed342e7696c68R201)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 3. I updated the KIP accordingly.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>> On 3/15/18 3:19 PM, Guozhang Wang wrote:
>>>>>>>>>>> Hello Matthias, thanks for the KIP. Here are some comments:
>>>>>>>>>>>
>>>>>>>>>>> 1. "For all other instances the leader sends a regular Assignment in
>>>>>>>>>>> version X back." Does that mean the leader will exclude any member 
>>>>>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>> group whose protocol version that it does not understand? For
>>>>>>>> example,
>>>>>>>>> if
>>>>>>>>>>> we have A, B, C with A the leader, and B bounced with the newer
>>>>>>>>> version.
>>>>>>>>>> In
>>>>>>>>>>> the first rebalance, A will only consider {A, C} for assignment 
>>>>>>>>>>> while
>>>>>>>>>>> sending empty assignment to B. And then later when B downgrades will
>>>>>>>> it
>>>>>>>>>>> re-assign the tasks to it again? I felt this is unnecessarily
>>>>>>>>> increasing
>>>>>>>>>>> the num. rebalances and the total latency. Could the leader just
>>>>>>>> sends
>>>>>>>>>>> empty assignment to everyone, and since upon receiving the empty
>>>>>>>>>> assignment
>>>>>>>>>>> each thread will not create / restore any tasks and will not clean 
>>>>>>>>>>> up
>>>>>>>>> its
>>>>>>>>>>> local state (so that the prevCachedTasks are not lost in future
>>>>>>>>>> rebalances)
>>>>>>>>>>> and re-joins immediately, if users choose to bounce an instance once
>>>>>>>> it
>>>>>>>>>> is
>>>>>>>>>>> in RUNNING state the total time of rolling upgrades will be reduced.
>>>>>>>>>>>
>>>>>>>>>>> 2. If we want to allow upgrading from 1.1- versions to any of the
>>>>>>>>> future
>>>>>>>>>>> versions beyond 1.2, then we'd always need to keep the special
>>>>>>>> handling
>>>>>>>>>>> logic for this two rolling-bounce mechanism plus a config that we
>>>>>>>> would
>>>>>>>>>>> never be able to deprecate; on the other hand, if the version 
>>>>>>>>>>> probing
>>>>>>>>>>> procedure is fast, I think the extra operational cost from upgrading
>>>>>>>>> from
>>>>>>>>>>> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
>>>>>>>>> another
>>>>>>>>>>> upgrade from 1.2 to a future version could be small. So depending on
>>>>>>>>> the
>>>>>>>>>>> experimental result of the upgrade latency, I'd suggest considering
>>>>>>>> the
>>>>>>>>>>> trade-off of the extra code/config needed maintaining for the 
>>>>>>>>>>> special
>>>>>>>>>>> handling.
>>>>>>>>>>>
>>>>>>>>>>> 3. Testing plan: could you elaborate a bit more on the actual
>>>>>>>>>> upgrade-paths
>>>>>>>>>>> we should test? For example, I'm thinking the following:
>>>>>>>>>>>
>>>>>>>>>>> a. 0.10.0 -> 1.2
>>>>>>>>>>> b. 1.1 -> 1.2
>>>>>>>>>>> c. 1.2 -> 1.3 (simulated v4)
>>>>>>>>>>> d. 0.10.0 -> 1.3 (simulated v4)
>>>>>>>>>>> e. 1.1 -> 1.3 (simulated v4)
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax <
>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I want to propose KIP-268 to allow rebalance metadata version
>>>>>>>> upgrades
>>>>>>>>>>>> in Kafka Streams:
>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to