Hi, KIP-268 (rebalance meatadata) is finished and included in AK 2.0 release. Thus, I want to pick up this KIP again to get the RocksDB upgrade done for 2.1.
I updated the KIP accordingly and also have a "prove of concept" PR ready (for "in place" upgrade only): https://github.com/apache/kafka/pull/5422/ There a still open questions, but I want to collect early feedback on the proposed interfaces we need for the store upgrade. Also note, that the KIP now also aim to define a generic upgrade path from any store format A to any other store format B. Adding timestamps is just a special case. I will continue to work on the PR and refine the KIP in the meantime, too. Looking forward to your feedback. -Matthias On 3/14/18 11:14 PM, Matthias J. Sax wrote: > After some more thoughts, I want to follow John's suggestion and split > upgrading the rebalance metadata from the store upgrade. > > I extracted the metadata upgrade into it's own KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > > I'll update this KIP accordingly shortly. I also want to consider to > make the store format upgrade more flexible/generic. Atm, the KIP is too > much tailored to the DSL IMHO and does not encounter PAPI users that we > should not force to upgrade the stores. I need to figure out the details > and follow up later. > > Please give feedback for the new KIP-268 on the corresponding discussion > thread. > > @James: unfortunately, for upgrading to 1.2 I couldn't figure out a way > for a single rolling bounce upgrade. But KIP-268 proposes a fix for > future upgrades. Please share your thoughts. > > Thanks for all your feedback! > > -Matthias > > On 3/12/18 11:56 PM, Matthias J. Sax wrote: >> @John: yes, we would throw if configs are missing (it's an >> implementation details IMHO and thus I did not include it in the KIP) >> >> @Guozhang: >> >> 1) I understand know what you mean. We can certainly, allow all values >> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for `upgrade.from` >> parameter. I had a similar though once but decided to collapse them into >> one -- will update the KIP accordingly. >> >> 2) The idea to avoid any config would be, to always send both request. >> If we add a config to eventually disable the old request, we don't gain >> anything with this approach. The question is really, if we are willing >> to pay this overhead from 1.2 on -- note, it would be limited to 2 >> versions and not grow further in future releases. More details in (3) >> >> 3) Yes, this approach subsumes (2) for later releases and allows us to >> stay with 2 "assignment strategies" we need to register, as the new >> assignment strategy will allow to "upgrade itself" via "version >> probing". Thus, (2) would only be a workaround to avoid a config if >> people upgrade from pre-1.2 releases. >> >> Thus, I don't think we need to register new "assignment strategies" and >> send empty subscriptions for older version. >> >> 4) I agree that this is a tricky thing to get right with a single >> rebalance. I share the concern that an application might never catch up >> and thus the hot standby will never be ready. >> >> Maybe it's better to go with 2 rebalances for store upgrades. If we do >> this, we also don't need to go with (2) and can get (3) in place for >> future upgrades. I also think that changes to the metadata are more >> likely and thus allowing for single rolling bounce for this case is more >> important anyway. If we assume that store upgrade a rare, it might be ok >> to sacrifice two rolling bounced for this case. It was just an idea I >> wanted to share (even if I see the issues). >> >> >> -Matthias >> >> >> >> On 3/12/18 11:45 AM, Guozhang Wang wrote: >>> Hello Matthias, thanks for your replies. >>> >>> >>> 1) About the config names: actually I was trying to not expose >>> implementation details :) My main concern was that in your proposal the >>> values need to cover the span of all the versions that are actually using >>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am upgrading >>> from any versions within this range I need to remember to use the value >>> "0.10.1.x-1.1.x" than just specifying my old version. In my suggestion I >>> was trying to argue the benefit of just letting users to specify the actual >>> Kafka version she's trying to upgrade from, than specifying a range of >>> versions. I was not suggesting to use "v1, v2, v3" etc as the values, but >>> still using Kafka versions like broker's `internal.version` config. But if >>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you meant to >>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or "1.1" which >>> are all recognizable config values then I think we are actually on the same >>> page. >>> >>> 2) About the "multi-assignment" idea: yes it would increase the network >>> footprint, but not the message size, IF I'm not mis-understanding your idea >>> of registering multiple assignment. More details: >>> >>> In the JoinGroupRequest, in the protocols field we can encode multiple >>> protocols each with their different metadata. The coordinator will pick the >>> common one that everyone supports (if there are no common one, it will send >>> an error back; if there are multiple ones, it will pick the one with most >>> votes, i.e. the one which was earlier in the encoded list). Since our >>> current Streams rebalance protocol is still based on the consumer >>> coordinator, it means our protocol_type would be "consumer", but instead >>> the protocol type we can have multiple protocols like "streams", >>> "streams_v2", "streams_v3" etc. The downside is that we need to implement a >>> different assignor class for each version and register all of them in >>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if we >>> re-factor our implementation to have our own client coordinator layer like >>> Connect did, we can simplify this part of the implementation. But even for >>> now with the above approach this is still doable. >>> >>> On the broker side, the group coordinator will only persist a group with >>> the selected protocol and its subscription metadata, e.g. if coordinator >>> decides to pick "streams_v2" it will only sends that protocol's metadata >>> from everyone to the leader to assign, AND when completing the rebalance it >>> will also only write the group metadata with that protocol and the >>> assignment only. In a word, although the network traffic maybe increased a >>> bit, it would not be a bummer in our trade-off. One corner situation we >>> need to consider is how to stop registering very old assignors to avoid the >>> network traffic from increasing indefinitely, e.g. if you are rolling >>> bounce from v2 to v3, then you'd not need to register v1 assignor anymore, >>> but that would unfortunately still require some configs. >>> >>> 3) About the "version probing" idea, I think that's a promising approach >>> as well, but if we are going to do the multi-assignment its value seems >>> subsumed? But I'm thinking maybe it can be added on top of multi-assignment >>> to save us from still requiring the config to avoid registering all the >>> metadata for all version. More details: >>> >>> In the JoinGroupRequest, we still register all the assignor but for all old >>> assignors we do not encode any metadata, i.e. the encoded data would be: >>> >>> "streams_vN" : "encoded metadata" >>> "streams_vN-1":empty >>> "streams_vN-2":empty >>> .. >>> "streams_0":empty >>> >>> So the coordinator can still safely choose the latest common version; and >>> then when leaders receive the subscription (note it should always recognize >>> that version), let's say it is streams_vN-2, if one of the subscriptions >>> are empty bytes, it will send the empty assignment with that version number >>> encoded in the metadata. So in the second auto-triggered all members would >>> send the metadata with that version: >>> >>> "streams_vN" : empty >>> "streams_vN-1" : empty >>> "streams_vN-2" : "encoded metadata" >>> .. >>> "streams_0":empty >>> >>> >>> By doing this we would not require any configs for users. >>> >>> >>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about the details >>> so probably we'd need to fill that out before making a call. For example, >>> you mentioned "If we detect this situation, the Streams application closes >>> corresponding active tasks as well as "hot standby" tasks, and re-creates >>> the new active tasks using the new store." How could we guarantee that the >>> gap between these two stores will keep decreasing than increasing so we'll >>> eventually achieve the flip point? And also the longer we are before the >>> flip point, the larger we are doubling the storage space, etc. >>> >>> >>> >>> Guozhang >>> >>> >>> >>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> @John, Guozhang, >>>> >>>> thanks a lot for your comments. Very long reply... >>>> >>>> >>>> About upgrading the rebalance metadata: >>>> >>>> Another possibility to do this, would be to register multiple assignment >>>> strategies for the 1.2 applications. For this case, new instances would >>>> be configured to support both and the broker would pick the version that >>>> all instances understand. The disadvantage would be, that we send much >>>> more data (ie, two subscriptions) in each rebalance as long as no second >>>> rebalance is done disabling the old protocol. Thus, using this approach >>>> would allow to avoid a second rebalance trading-off an increased >>>> rebalance network footprint (I also assume that this would increase the >>>> message size that is written into __consumer_offsets topic?). Overall, I >>>> am not sure if this would be a good tradeoff, but it could avoid a >>>> second rebalance (I have some more thoughts about stores below that are >>>> relevant for single rebalance upgrade). >>>> >>>> For future upgrades we might be able to fix this though. I was thinking >>>> about the following: >>>> >>>> In the current implementation, the leader fails if it gets a >>>> subscription it does not understand (ie, newer version). We could change >>>> this behavior and let the leader send an empty assignment plus error >>>> code (including supported version) back to the instance sending the >>>> "bad" subscription. This would allow the following logic for an >>>> application instance: >>>> >>>> - on startup, always send the latest subscription format >>>> - if leader understands it, we get an assignment back an start processing >>>> - if leader does not understand it, we get an empty assignment and >>>> supported version back >>>> - the application unsubscribe()/subscribe()/poll() again and sends a >>>> subscription using the leader's supported version >>>> >>>> This protocol would allow to do a single rolling bounce, and implements >>>> a "version probing" step, that might result in two executed rebalances. >>>> The advantage would be, that the user does not need to set any configs >>>> or do multiple rolling bounces, as Streams takes care of this >>>> automatically. >>>> >>>> One disadvantage would be, that two rebalances happen and that for an >>>> error case during rebalance, we loose the information about the >>>> supported leader version and the "probing step" would happen a second time. >>>> >>>> If the leader is eventually updated, it will include it's own supported >>>> version in all assignments, to allow a "down graded" application to >>>> upgrade its version later. Also, if a application fails, the first >>>> probing would always be successful and only a single rebalance happens. >>>> If we use this protocol, I think we don't need any configuration >>>> parameter for future upgrades. >>>> >>>> >>>> About "upgrade.from" vs "internal.protocol.version": >>>> >>>> Users would set "upgrade.from" to the release version the current/old >>>> application is using. I think this is simpler, as users know this >>>> version. If we use "internal.protocol.version" instead, we expose >>>> implementation details and users need to know the protocol version (ie, >>>> they need to map from the release version to the protocol version; ie, >>>> "I am run 0.11.0 that runs with metadata protocol version 2"). >>>> >>>> Also the KIP states that for the second rolling bounce, the >>>> "upgrade.mode" config should be set back to `null` -- and thus, >>>> "upgrade.from" would not have any effect and is ignored (I will update >>>> the KIP to point out this dependency). >>>> >>>> >>>> >>>> About your second point: I'll update the KIP accordingly to describe >>>> future updates as well. Both will be different. >>>> >>>> >>>> >>>> One more point about upgrading the store format. I was thinking about >>>> avoiding the second rolling bounce all together in the future: (1) the >>>> goal is to achieve an upgrade with zero downtime (2) this required to >>>> prepare the stores as "hot standbys" before we do the switch and delete >>>> the old stores. (3) the current proposal does the switch "globally" -- >>>> this is simpler and due to the required second rebalance no disadvantage. >>>> However, a global consistent switch over might actually not be required. >>>> For "in_place" upgrade, following the protocol from above, we could >>>> decouple the store switch and each instance could switch its store >>>> independently from all other instances. After the rolling bounce, it >>>> seems to be ok to switch from the old store to the new store "under the >>>> hood" whenever the new store is ready (this could even be done, before >>>> we switch to the new metadata version). Each time we update the "hot >>>> standby" we check if it reached the "endOffset" (or maybe X% that could >>>> either be hardcoded or configurable). If we detect this situation, the >>>> Streams application closes corresponding active tasks as well as "hot >>>> standby" tasks, and re-creates the new active tasks using the new store. >>>> (I need to go through the details once again, but it seems to be >>>> feasible.). >>>> >>>> Combining this strategy with the "multiple assignment" idea, might even >>>> enable us to do an single rolling bounce upgrade from 1.1 -> 1.2. >>>> Applications would just use the old store, as long as the new store is >>>> not ready, even if the new metadata version is used already. >>>> >>>> For future upgrades, a single rebalance would be sufficient, too, even >>>> if the stores are upgraded. We would not need any config parameters as >>>> the "probe" step allows us to detect the supported rebalance metadata >>>> version (and we would also not need multiple "assigmnent strategies" as >>>> out own protocol encoded everything we need). >>>> >>>> >>>> Let me know what you think. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: >>>>> @John: >>>>> >>>>> For the protocol version upgrade, it is only for the encoded metadata >>>> bytes >>>>> protocol, which are just bytes-in bytes-out from Consumer's pov, so I >>>> think >>>>> this change should be in the Streams layer as well. >>>>> >>>>> @Matthias: >>>>> >>>>> for 2), I agree that adding a "newest supported version" besides the >>>>> "currently used version for encoding" is a good idea to allow either >>>> case; >>>>> the key is that in Streams we would likely end up with a mapping from the >>>>> protocol version to the other persistent data format versions such as >>>>> rocksDB, changelog. So with such a map we can actually achieve both >>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol version's >>>>> corresponding data format does not change, e.g. 0.10.0 -> 0.10.1 leaders >>>>> can choose to use the newer version in the first rolling bounce directly >>>>> and we can document to users that they would not need to set >>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded protocol >>>> version >>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and then we can >>>>> document that "upgrade.mode" needs to be set in the first rolling bounce >>>>> and reset in the second. >>>>> >>>>> >>>>> Besides that, some additional comments: >>>>> >>>>> 1) I still think "upgrade.from" is less intuitive for users to set than >>>>> "internal.protocol.version" where for the latter users only need to set a >>>>> single version, while the Streams will map that version to the Streams >>>>> assignor's behavior as well as the data format. But maybe I did not get >>>>> your idea about how the "upgrade.from" config will be set, because in >>>>> your Compatibility section how the upgrade.from config will be set for >>>>> these two rolling bounces are not very clear: for example, should user >>>>> reset it to null in the second rolling bounce? >>>>> >>>>> 2) In the upgrade path description, rather than talking about specific >>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize all the >>>>> possible scenarios, even for future upgrade versions, what should be the >>>>> standard operations? The categorized we can summarize to would be >>>> (assuming >>>>> user upgrade from version X to version Y, where X and Y are Kafka >>>> versions, >>>>> with the corresponding supported protocol version x and y): >>>>> >>>>> >>>>> a. x == y, i.e. metadata protocol does not change, and hence no >>>> persistent >>>>> data formats have changed. >>>>> >>>>> b. x != y, but all persistent data format remains the same. >>>>> >>>>> b. x !=y, AND some persistene data format like RocksDB format, changelog >>>>> format, has been changed. >>>>> >>>>> c. special case: we may need some special handling logic when "current >>>>> version" or "newest supported version" are not available in the protocol, >>>>> i.e. for X as old as 0.10.0 and before 1.2. >>>>> >>>>> >>>>> under the above scenarios, how many rolling bounces users need to >>>> execute? >>>>> how they should set the configs in each rolling bounce? and how Streams >>>>> library will execute in these cases? >>>>> >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Ted, >>>>>> >>>>>> I still consider changing the KIP to include it right away -- if not, >>>>>> I'll create a JIRA. Need to think it through in more detail first. >>>>>> >>>>>> (Same for other open questions like interface names -- I collect >>>>>> feedback and update the KIP after we reach consensus :)) >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: >>>>>>> Thanks for the details, Matthias. >>>>>>> >>>>>>> bq. change the metadata protocol only if a future release, encoding >>>> both >>>>>> used >>>>>>> and supported version might be an advantage >>>>>>> >>>>>>> Looks like encoding both versions wouldn't be implemented in this KIP. >>>>>>> >>>>>>> Please consider logging a JIRA with the encoding proposal. >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> @Bill: I think a filter predicate should be part of user code. And >>>> even >>>>>>>> if we want to add something like this, I would prefer to do it in a >>>>>>>> separate KIP. >>>>>>>> >>>>>>>> >>>>>>>> @James: I would love to avoid a second rolling bounce. But from my >>>>>>>> understanding it would not be possible. >>>>>>>> >>>>>>>> The purpose of the second rolling bounce is indeed to switch from >>>>>>>> version 2 to 3. It also has a second purpose, to switch from the old >>>>>>>> store to the new store (this happens after the last instance bounces a >>>>>>>> second time). >>>>>>>> >>>>>>>> The problem with one round of rolling bounces is, that it's unclear >>>> when >>>>>>>> to which from version 2 to version 3. The StreamsPartitionsAssignor is >>>>>>>> stateless by design, and thus, the information which version it should >>>>>>>> use must be passed in from externally -- and we want to use the >>>>>>>> StreamsConfig to pass in this information. >>>>>>>> >>>>>>>> During upgrade, all new instanced have no information about the >>>> progress >>>>>>>> of the upgrade (ie, how many other instanced got upgrades already). >>>>>>>> Therefore, it's not safe for them to send a version 3 subscription. >>>> The >>>>>>>> leader also has this limited view on the world and can only send >>>> version >>>>>>>> 2 assignments back. >>>>>>>> >>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade. >>>>>>>> >>>>>>>> We did consider to change the metadata to make later upgrades (ie, >>>> from >>>>>>>> 1.2 to 1.x) simpler though (for the case we change the metadata or >>>>>>>> storage format again -- as long as we don't change it, a single >>>> rolling >>>>>>>> bounce is sufficient), by encoding "used version" and "supported >>>>>>>> version". This would allow the leader to switch to the new version >>>>>>>> earlier and without a second rebalance: leader would receive "used >>>>>>>> version == old" and "supported version = old/new" -- as long as at >>>> least >>>>>>>> one instance sends a "supported version = old" leader sends old >>>> version >>>>>>>> assignment back. However, encoding both version would allow that the >>>>>>>> leader can send a new version assignment back, right after the first >>>>>>>> round or rebalance finished (all instances send "supported version = >>>>>>>> new"). However, there are still two issues with this: >>>>>>>> >>>>>>>> 1) if we switch to the new format right after the last instance >>>> bounced, >>>>>>>> the new stores might not be ready to be used -- this could lead to >>>>>>>> "downtime" as store must be restored before processing can resume. >>>>>>>> >>>>>>>> 2) Assume an instance fails and is restarted again. At this point, the >>>>>>>> instance will still have "upgrade mode" enabled and thus sends the old >>>>>>>> protocol data. However, it would be desirable to never fall back to >>>> the >>>>>>>> old protocol after the switch to the new protocol. >>>>>>>> >>>>>>>> The second issue is minor and I guess if users set-up the instance >>>>>>>> properly it could be avoided. However, the first issue would prevent >>>>>>>> "zero downtime" upgrades. Having said this, if we consider that we >>>> might >>>>>>>> change the metadata protocol only if a future release, encoding both >>>>>>>> used and supported version might be an advantage in the future and we >>>>>>>> could consider to add this information in 1.2 release to prepare for >>>>>> this. >>>>>>>> >>>>>>>> Btw: monitoring the log, is also only required to give the instances >>>>>>>> enough time to prepare the stores in new format. If you would do the >>>>>>>> second rolling bounce before this, it would still work -- however, you >>>>>>>> might see app "downtime" as the new store must be fully restored >>>> before >>>>>>>> processing can resume. >>>>>>>> >>>>>>>> >>>>>>>> Does this make sense? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: >>>>>>>>> Matthias, >>>>>>>>> >>>>>>>>> For all the upgrade paths, is it possible to get rid of the 2nd >>>> rolling >>>>>>>> bounce? >>>>>>>>> >>>>>>>>> For the in-place upgrade, it seems like primary difference between >>>> the >>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide whether to >>>>>> send >>>>>>>> Subscription Version 2 or Subscription Version 3. (Actually, there is >>>>>>>> another difference mentioned in that the KIP says that the 2nd rolling >>>>>>>> bounce should happen after all new state stores are created by the >>>>>>>> background thread. However, within the 2nd rolling bounce, we say that >>>>>>>> there is still a background thread, so it seems like is no actual >>>>>>>> requirement to wait for the new state stores to be created.) >>>>>>>>> >>>>>>>>> The 2nd rolling bounce already knows how to deal with mixed-mode >>>>>> (having >>>>>>>> both Version 2 and Version 3 in the same consumer group). It seems >>>> like >>>>>> we >>>>>>>> could get rid of the 2nd bounce if we added logic (somehow/somewhere) >>>>>> such >>>>>>>> that: >>>>>>>>> * Instances send Subscription Version 2 until all instances are >>>> running >>>>>>>> the new code. >>>>>>>>> * Once all the instances are running the new code, then one at a >>>> time, >>>>>>>> the instances start sending Subscription V3. Leader still hands out >>>>>>>> Assignment Version 2, until all new state stores are ready. >>>>>>>>> * Once all instances report that new stores are ready, Leader sends >>>> out >>>>>>>> Assignment Version 3. >>>>>>>>> * Once an instance receives an Assignment Version 3, it can delete >>>> the >>>>>>>> old state store. >>>>>>>>> >>>>>>>>> Doing it that way seems like it would reduce a lot of >>>>>>>> operator/deployment overhead. No need to do 2 rolling restarts. No >>>> need >>>>>> to >>>>>>>> monitor logs for state store rebuild. You just deploy it, and the >>>>>> instances >>>>>>>> update themselves. >>>>>>>>> >>>>>>>>> What do you think? >>>>>>>>> >>>>>>>>> The thing that made me think of this is that the "2 rolling bounces" >>>> is >>>>>>>> similar to what Kafka brokers have to do changes in >>>>>>>> inter.broker.protocol.version and log.message.format.version. And in >>>> the >>>>>>>> broker case, it seems like it would be possible (with some work of >>>>>> course) >>>>>>>> to modify kafka to allow us to do similar auto-detection of broker >>>>>>>> capabilities and automatically do a switchover from old/new versions. >>>>>>>>> >>>>>>>>> -James >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Matthias, >>>>>>>>>> >>>>>>>>>> Thanks for the KIP, it's a +1 from me. >>>>>>>>>> >>>>>>>>>> I do have one question regarding the retrieval methods on the new >>>>>>>>>> interfaces. >>>>>>>>>> >>>>>>>>>> Would want to consider adding one method with a Predicate that would >>>>>>>> allow >>>>>>>>>> for filtering records by the timestamp stored with the record? Or >>>> is >>>>>>>> this >>>>>>>>>> better left for users to implement themselves once the data has been >>>>>>>>>> retrieved? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Bill >>>>>>>>>> >>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Matthias: >>>>>>>>>>> For my point #1, I don't have preference as to which separator is >>>>>>>> chosen. >>>>>>>>>>> Given the background you mentioned, current choice is good. >>>>>>>>>>> >>>>>>>>>>> For #2, I think my proposal is better since it is closer to English >>>>>>>>>>> grammar. >>>>>>>>>>> >>>>>>>>>>> Would be good to listen to what other people think. >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the comments! >>>>>>>>>>>> >>>>>>>>>>>> @Guozhang: >>>>>>>>>>>> >>>>>>>>>>>> So far, there is one PR for the rebalance metadata upgrade fix >>>>>>>>>>>> (addressing the mentioned >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first >>>>>>>>>>>> impression how the metadata upgrade works including a system test: >>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 >>>>>>>>>>>> >>>>>>>>>>>> I can share other PRs as soon as they are ready. I agree that the >>>>>> KIP >>>>>>>> is >>>>>>>>>>>> complex am I ok with putting out more code to give better >>>> discussion >>>>>>>>>>>> context. >>>>>>>>>>>> >>>>>>>>>>>> @Ted: >>>>>>>>>>>> >>>>>>>>>>>> I picked `_` instead of `-` to align with the >>>> `processing.guarantee` >>>>>>>>>>>> parameter that accepts `at_least_one` and `exactly_once` as >>>> values. >>>>>>>>>>>> Personally, I don't care about underscore vs dash but I prefer >>>>>>>>>>>> consistency. If you feel strong about it, we can also change it to >>>>>>>> `-`. >>>>>>>>>>>> >>>>>>>>>>>> About the interface name: I am fine either way -- I stripped the >>>>>>>> `With` >>>>>>>>>>>> to keep the name a little shorter. Would be good to get feedback >>>>>> from >>>>>>>>>>>> others and pick the name the majority prefers. >>>>>>>>>>>> >>>>>>>>>>>> @John: >>>>>>>>>>>> >>>>>>>>>>>> We can certainly change it. I agree that it would not make a >>>>>>>> difference. >>>>>>>>>>>> I'll dig into the code to see if any of the two version might >>>>>>>> introduce >>>>>>>>>>>> undesired complexity and update the KIP if I don't hit an issue >>>> with >>>>>>>>>>>> putting the `-v2` to the store directory instead of `rocksdb-v2` >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: >>>>>>>>>>>>> Hey Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> The KIP looks good to me. I had several questions queued up, but >>>>>> they >>>>>>>>>>>> were >>>>>>>>>>>>> all in the "rejected alternatives" section... oh, well. >>>>>>>>>>>>> >>>>>>>>>>>>> One very minor thought re changing the state directory from >>>>>>>>>>>> "/<state.dir>/< >>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/< >>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the >>>>>>>> "v2" >>>>>>>>>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/< >>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get >>>>>> the >>>>>>>>>>> same >>>>>>>>>>>>> benefits without altering the high-level directory structure. >>>>>>>>>>>>> >>>>>>>>>>>>> It may not matter, but I could imagine people running scripts to >>>>>>>>>>> monitor >>>>>>>>>>>>> rocksdb disk usage for each task, or other such use cases. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com> >>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Matthias: >>>>>>>>>>>>>> Nicely written KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes >>>> be >>>>>>>> miss >>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to user. >>>>>>>>>>>>>> >>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> { >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the >>>> class ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < >>>> wangg...@gmail.com >>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I've read through the upgrade patch section and it looks good >>>> to >>>>>>>> me, >>>>>>>>>>> if >>>>>>>>>>>>>> you >>>>>>>>>>>>>>> already have a WIP PR for it could you also share it here so >>>> that >>>>>>>>>>>> people >>>>>>>>>>>>>>> can take a look? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are >>>>>> always >>>>>>>>>>>> some >>>>>>>>>>>>>>> devil hidden in the details, so I think it is better to have >>>> the >>>>>>>>>>>>>>> implementation in parallel along with the design discussion :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < >>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing >>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve >>>>>>>> multiple >>>>>>>>>>>>>>>> tickets (issues and feature requests). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Looking forward to your comments about this! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature