Hi Matthias Thanks - I figured that it was probably a case of just too much to do and not enough time. I know how that can go. I am asking about this one in relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a TTL to RocksDB. I have outlined a bit about my use-case within 4212, but for brevity here it is:
My case: 1) I have a RocksDB with TTL implementation working where records are aged out using the TTL that comes with RocksDB (very simple). 2) We prevent records from loading from the changelog if recordTime + TTL < referenceTimeStamp (default = System.currentTimeInMillis() ). This assumes that the records are stored with the same time reference (say UTC) as the consumer materializing the RocksDB store. My questions about KIP-258 are as follows: 1) How does "we want to be able to store record timestamps in KTables" differ from inserting records into RocksDB with TTL at consumption time? I understand that it could be a difference of some seconds, minutes, hours, days etc between when the record was published and now, but given the nature of how RocksDB TTL works (eventual - based on compaction) I don't see how a precise TTL can be achieved, such as that which one can get with windowed stores. 2) Are you looking to change how records are inserted into a TTL RocksDB, such that the TTL would take effect from the record's published time? If not, what would be the ideal workflow here for a single record with TTL RocksDB? ie: Record Timestamp: 100 TTL: 50 Record inserted into rocksDB: 110 Record to expire at 150? 3) I'm not sure I fully understand the importance of the upgrade path. I have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522) in the KIP, and I can understand that a state-store on disk may not represent what the application is expecting. I don't think I have the full picture though, because that issue seems to be easy to fix with a simple versioned header or accompanying file, forcing the app to rebuild the state if the version is incompatible. Can you elaborate or add a scenario to the KIP that illustrates the need for the upgrade path? Thanks, Adam On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <matth...@confluent.io> wrote: > Adam, > > I am still working on it. Was pulled into a lot of other tasks lately so > this was delayed. Also had some discussions about simplifying the > upgrade path with some colleagues and I am prototyping this atm. Hope to > update the KIP accordingly soon. > > -Matthias > > On 11/10/18 7:41 AM, Adam Bellemare wrote: > > Hello Matthias > > > > I am curious as to the status of this KIP. TTL and expiry of records will > > be extremely useful for several of our business use-cases, as well as > > another KIP I had been working on. > > > > Thanks > > > > > > > > On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com> > > wrote: > > > >> Hi Matthias, > >> > >> Good stuff. Could you comment a bit on how future-proof is this change? > For > >> example, if we want to store both event timestamp "and" processing time > in > >> RocksDB will we then need another interface (e.g. called > >> KeyValueWithTwoTimestampsStore)? > >> > >> Thanks > >> Eno > >> > >> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Thanks for your input Guozhang and John. > >>> > >>> I see your point, that the upgrade API is not simple. If you don't > >>> thinks it's valuable to make generic store upgrades possible (atm), we > >>> can make the API internal, too. The impact is, that we only support a > >>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to > >>> WindowedWithTS etc) for which we implement the internal interfaces. > >>> > >>> We can keep the design generic, so if we decide to make it public, we > >>> don't need to re-invent it. This will also have the advantage, that we > >>> can add upgrade pattern for other stores later, too. > >>> > >>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it > >>> was the only way I could find to design a generic upgrade interface. If > >>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would > >>> become an internal interface I guess (don't think we can remove it). > >>> > >>> I will wait for more feedback about this and if nobody wants to keep it > >>> as public API I will update the KIP accordingly. Will add some more > >>> clarifications for different upgrade patterns in the mean time and fix > >>> the typos/minor issues. > >>> > >>> About adding a new state UPGRADING: maybe we could do that. However, I > >>> find it particularly difficult to make the estimation when we should > >>> switch to RUNNING, thus, I am a little hesitant. Using store callbacks > >>> or just logging the progress including some indication about the "lag" > >>> might actually be sufficient. Not sure what others think? > >>> > >>> About "value before timestamp": no real reason and I think it does not > >>> make any difference. Do you want to change it? > >>> > >>> About upgrade robustness: yes, we cannot control if an instance fails. > >>> That is what I meant by "we need to write test". The upgrade should be > >>> able to continuous even is an instance goes down (and we must make sure > >>> that we don't end up in an invalid state that forces us to wipe out the > >>> whole store). Thus, we need to write system tests that fail instances > >>> during upgrade. > >>> > >>> For `in_place_offline` upgrade: I don't think we need this mode, > because > >>> people can do this via a single rolling bounce. > >>> > >>> - prepare code and switch KV-Store to KVwithTs-Store > >>> - do a single rolling bounce (don't set any upgrade config) > >>> > >>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we > >>> remove the `StoreUpgradeBuilder`) will detect that there is only an old > >>> local KV store w/o TS, will start to restore the new KVwithTs store, > >>> wipe out the old store and replace with the new store after restore is > >>> finished, and start processing only afterwards. (I guess we need to > >>> document this case -- will also add it to the KIP.) > >>> > >>> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 8/9/18 1:10 PM, John Roesler wrote: > >>>> Hi Matthias, > >>>> > >>>> I think this KIP is looking really good. > >>>> > >>>> I have a few thoughts to add to the others: > >>>> > >>>> 1. You mentioned at one point users needing to configure > >>>> `upgrade.mode="null"`. I think this was a typo and you meant to say > >> they > >>>> should remove the config. If they really have to set it to a string > >>> "null" > >>>> or even set it to a null value but not remove it, it would be > >>> unfortunate. > >>>> > >>>> 2. In response to Bill's comment #1 , you said that "The idea is that > >> the > >>>> upgrade should be robust and not fail. We need to write according > >> tests". > >>>> I may have misunderstood the conversation, but I don't think it's > >> within > >>>> our power to say that an instance won't fail. What if one of my > >> computers > >>>> catches on fire? What if I'm deployed in the cloud and one instance > >>>> disappears and is replaced by a new one? Or what if one instance goes > >>> AWOL > >>>> for a long time and then suddenly returns? How will the upgrade > process > >>>> behave in light of such failures? > >>>> > >>>> 3. your thought about making in-place an offline mode is interesting, > >> but > >>>> it might be a bummer for on-prem users who wish to upgrade online, but > >>>> cannot just add new machines to the pool. It could be a new upgrade > >> mode > >>>> "offline-in-place", though... > >>>> > >>>> 4. I was surprised to see that a user would need to modify the > topology > >>> to > >>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's > >>>> suggestions would remove this necessity. > >>>> > >>>> Thanks for taking on this very complex but necessary work. > >>>> > >>>> -John > >>>> > >>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>> > >>>>> Hello Matthias, > >>>>> > >>>>> Thanks for the updated KIP. Some more comments: > >>>>> > >>>>> 1. The current set of proposed API is a bit too complicated, which > >> makes > >>>>> the upgrade flow from user's perspective also a bit complex. I'd like > >> to > >>>>> check different APIs and discuss about their needs separately: > >>>>> > >>>>> 1.a. StoreProxy: needed for in-place upgrade only, between the > >> first > >>>>> and second rolling bounce, where the old-versioned stores can handle > >>>>> new-versioned store APIs. I think such upgrade paths (i.e. from one > >>> store > >>>>> type to another) would not be very common: users may want to upgrade > >>> from a > >>>>> certain store engine to another, but the interface would likely be > >>> staying > >>>>> the same. Hence personally I'd suggest we keep it internally and only > >>>>> consider exposing it in the future if it does become a common > pattern. > >>>>> > >>>>> 1.b. ConverterStore / RecordConverter: needed for both in-place > >> and > >>>>> roll-over upgrade, between the first and second rolling bounces, for > >> the > >>>>> new versioned store to be able to read old-versioned changelog > topics. > >>>>> Firstly I think we should not expose key in the public APIs but only > >> the > >>>>> values, since allowing key format changes would break log compaction, > >>> and > >>>>> hence would not be compatible anyways. As for value format changes, > >>>>> personally I think we can also keep its upgrade logic internally as > it > >>> may > >>>>> not worth generalizing to user customizable logic. > >>>>> > >>>>> 1.c. If you agrees with 2.a/b above, then we can also remove " > >>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public > >>> APIs. > >>>>> > >>>>> 1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is > >> not > >>>>> needed either given that we are exposing "ValueAndTimestamp" anyways. > >>> I.e. > >>>>> it is just a syntax sugar and for IQ, users can always just set a " > >>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the > >>> new > >>>>> interface does not provide any additional functions. > >>>>> > >>>>> > >>>>> 2. Could we further categorize the upgrade flow for different use > >> cases, > >>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used > >>>>> automatically for non-windowed aggregate; 2) PAPI users who do not > >> need > >>> to > >>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch > to > >>>>> KeyValueWithTimestampStore. Just to give my understanding for 3), the > >>>>> upgrade flow for users may be simplified as the following (for both > >>>>> in-place and roll-over): > >>>>> > >>>>> * Update the jar to new version, make code changes from > >>> KeyValueStore > >>>>> to KeyValueWithTimestampStore, set upgrade config. > >>>>> > >>>>> * First rolling bounce, and library code can internally use proxy > >> / > >>>>> converter based on the specified config to handle new APIs with old > >>> stores, > >>>>> while let new stores read from old changelog data. > >>>>> > >>>>> * Reset upgrade config. > >>>>> > >>>>> * Second rolling bounce, and the library code automatically turn > >> off > >>>>> logic for proxy / converter. > >>>>> > >>>>> > >>>>> 3. Some more detailed proposals are needed for when to recommend > users > >>> to > >>>>> trigger the second rolling bounce. I have one idea to share here: we > >>> add a > >>>>> new state to KafkaStreams, say UPGRADING, which is set when 1) > upgrade > >>>>> config is set, and 2) the new stores are still ramping up (for the > >>> second > >>>>> part, we can start with some internal hard-coded heuristics to decide > >>> when > >>>>> it is close to be ramped up). If either one of it is not true any > >> more, > >>> it > >>>>> should transit to RUNNING. Users can then watch on this state, and > >>> decide > >>>>> to only trigger the second rebalance when the state has transited > from > >>>>> UPGRADING. They can also choose to cut over while the instance is > >> still > >>>>> UPGRADING, the downside is that after that the application may have > >> long > >>>>> restoration phase which is, to user's pov, unavailability periods. > >>>>> > >>>>> > >>>>> Below are just some minor things on the wiki: > >>>>> > >>>>> 4. "proxy story" => "proxy store". > >>>>> > >>>>> 5. "use the a builder " => "use a builder" > >>>>> > >>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the > >> value": > >>>>> what's the rationale of putting the timestamp before the value, than > >>> after > >>>>> the value? > >>>>> > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax < > >> matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Thanks for the feedback Bill. I just update the KIP with some of > your > >>>>>> points. > >>>>>> > >>>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch > >> the > >>>>>>>> restore process), I'm wondering if we want to provide a type of > >>>>>>>> StateRestoreListener that could signal when the new stores have > >>>>> reached > >>>>>>>> parity with the existing old stores and that could be the signal > to > >>>>>> start > >>>>>>>> second rolling rebalance? > >>>>>> > >>>>>> I think we can reuse the existing listeners, thus, I did not include > >>>>>> anything in the KIP. About a signal to rebalance: this might be > >> tricky. > >>>>>> If we prepare the store "online", the active task will update the > >> state > >>>>>> continuously, and thus, state prepare is never finished. It will be > >> the > >>>>>> users responsibility to do the second rebalance (note, that the > >> second > >>>>>> rebalance will first finish the last delta of the upgrade to finish > >> the > >>>>>> upgrade before actual processing resumes). I clarified the KIP with > >>> this > >>>>>> regard a little bit. > >>>>>> > >>>>>>>> 1. Out of N instances, one fails midway through the process, would > >> we > >>>>>> allow > >>>>>>>> the other instances to complete or just fail the entire upgrade? > >>>>>> > >>>>>> The idea is that the upgrade should be robust and not fail. We need > >> to > >>>>>> write according tests. > >>>>>> > >>>>>>>> 2. During the second rolling bounce, maybe we could rename the > >>> current > >>>>>>>> active directories vs. deleting them right away, and when all the > >>>>>> prepare > >>>>>>>> task directories are successfully migrated then delete the > previous > >>>>>> active > >>>>>>>> ones. > >>>>>> > >>>>>> Ack. Updated the KIP. > >>>>>> > >>>>>>>> 3. For the first rolling bounce we pause any processing any new > >>>>> records > >>>>>> and > >>>>>>>> just allow the prepare tasks to restore, then once all prepare > >> tasks > >>>>>> have > >>>>>>>> restored, it's a signal for the second round of rolling bounces > and > >>>>>> then as > >>>>>>>> each task successfully renames its prepare directories and deletes > >>> the > >>>>>> old > >>>>>>>> active task directories, normal processing of records resumes. > >>>>>> > >>>>>> The basic idea is to do an online upgrade to avoid downtime. We can > >>>>>> discuss to offer both options... For the offline upgrade option, we > >>>>>> could simplify user interaction and trigger the second rebalance > >>>>>> automatically with the requirement that a user needs to update any > >>>>> config. > >>>>>> > >>>>>> If might actually be worth to include this option: we know from > >>>>>> experience with state restore, that regular processing slows down > the > >>>>>> restore. For roll_over upgrade, it would be a different story and > >>>>>> upgrade should not be slowed down by regular processing. Thus, we > >>> should > >>>>>> even make in_place an offline upgrade and force people to use > >> roll_over > >>>>>> if they need onlint upgrade. Might be a fair tradeoff that may > >> simplify > >>>>>> the upgrade for the user and for the code complexity. > >>>>>> > >>>>>> Let's see what other think. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> Thanks for the update and the working prototype, it helps with > >>>>>>> understanding the KIP. > >>>>>>> > >>>>>>> I took an initial pass over this PR, and overall I find the > >> interfaces > >>>>>> and > >>>>>>> approach to be reasonable. > >>>>>>> > >>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch > >> the > >>>>>>> restore process), I'm wondering if we want to provide a type of > >>>>>>> StateRestoreListener that could signal when the new stores have > >>> reached > >>>>>>> parity with the existing old stores and that could be the signal to > >>>>> start > >>>>>>> second rolling rebalance? > >>>>>>> > >>>>>>> Although you solicited feedback on the interfaces involved, I > wanted > >>> to > >>>>>> put > >>>>>>> down some thoughts that have come to mind reviewing this KIP again > >>>>>>> > >>>>>>> 1. Out of N instances, one fails midway through the process, would > >> we > >>>>>> allow > >>>>>>> the other instances to complete or just fail the entire upgrade? > >>>>>>> 2. During the second rolling bounce, maybe we could rename the > >> current > >>>>>>> active directories vs. deleting them right away, and when all the > >>>>>> prepare > >>>>>>> task directories are successfully migrated then delete the previous > >>>>>> active > >>>>>>> ones. > >>>>>>> 3. For the first rolling bounce we pause any processing any new > >>> records > >>>>>> and > >>>>>>> just allow the prepare tasks to restore, then once all prepare > tasks > >>>>> have > >>>>>>> restored, it's a signal for the second round of rolling bounces and > >>>>> then > >>>>>> as > >>>>>>> each task successfully renames its prepare directories and deletes > >> the > >>>>>> old > >>>>>>> active task directories, normal processing of records resumes. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Bill > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax < > >>> matth...@confluent.io > >>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0 > >>>>>>>> release. Thus, I want to pick up this KIP again to get the RocksDB > >>>>>>>> upgrade done for 2.1. > >>>>>>>> > >>>>>>>> I updated the KIP accordingly and also have a "prove of concept" > PR > >>>>>>>> ready (for "in place" upgrade only): > >>>>>>>> https://github.com/apache/kafka/pull/5422/ > >>>>>>>> > >>>>>>>> There a still open questions, but I want to collect early feedback > >> on > >>>>>>>> the proposed interfaces we need for the store upgrade. Also note, > >>> that > >>>>>>>> the KIP now also aim to define a generic upgrade path from any > >> store > >>>>>>>> format A to any other store format B. Adding timestamps is just a > >>>>>>>> special case. > >>>>>>>> > >>>>>>>> I will continue to work on the PR and refine the KIP in the > >> meantime, > >>>>>> too. > >>>>>>>> > >>>>>>>> Looking forward to your feedback. > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote: > >>>>>>>>> After some more thoughts, I want to follow John's suggestion and > >>>>> split > >>>>>>>>> upgrading the rebalance metadata from the store upgrade. > >>>>>>>>> > >>>>>>>>> I extracted the metadata upgrade into it's own KIP: > >>>>>>>>> > >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268% > >>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade > >>>>>>>>> > >>>>>>>>> I'll update this KIP accordingly shortly. I also want to consider > >> to > >>>>>>>>> make the store format upgrade more flexible/generic. Atm, the KIP > >> is > >>>>>> too > >>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI users > >> that > >>>>> we > >>>>>>>>> should not force to upgrade the stores. I need to figure out the > >>>>>> details > >>>>>>>>> and follow up later. > >>>>>>>>> > >>>>>>>>> Please give feedback for the new KIP-268 on the corresponding > >>>>>> discussion > >>>>>>>>> thread. > >>>>>>>>> > >>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out > >> a > >>>>> way > >>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix > >> for > >>>>>>>>> future upgrades. Please share your thoughts. > >>>>>>>>> > >>>>>>>>> Thanks for all your feedback! > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote: > >>>>>>>>>> @John: yes, we would throw if configs are missing (it's an > >>>>>>>>>> implementation details IMHO and thus I did not include it in the > >>>>> KIP) > >>>>>>>>>> > >>>>>>>>>> @Guozhang: > >>>>>>>>>> > >>>>>>>>>> 1) I understand know what you mean. We can certainly, allow all > >>>>> values > >>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for > >> `upgrade.from` > >>>>>>>>>> parameter. I had a similar though once but decided to collapse > >> them > >>>>>> into > >>>>>>>>>> one -- will update the KIP accordingly. > >>>>>>>>>> > >>>>>>>>>> 2) The idea to avoid any config would be, to always send both > >>>>> request. > >>>>>>>>>> If we add a config to eventually disable the old request, we > >> don't > >>>>>> gain > >>>>>>>>>> anything with this approach. The question is really, if we are > >>>>> willing > >>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to > >> 2 > >>>>>>>>>> versions and not grow further in future releases. More details > in > >>>>> (3) > >>>>>>>>>> > >>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows > >> us > >>>>> to > >>>>>>>>>> stay with 2 "assignment strategies" we need to register, as the > >> new > >>>>>>>>>> assignment strategy will allow to "upgrade itself" via "version > >>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a config > >> if > >>>>>>>>>> people upgrade from pre-1.2 releases. > >>>>>>>>>> > >>>>>>>>>> Thus, I don't think we need to register new "assignment > >> strategies" > >>>>>> and > >>>>>>>>>> send empty subscriptions for older version. > >>>>>>>>>> > >>>>>>>>>> 4) I agree that this is a tricky thing to get right with a > single > >>>>>>>>>> rebalance. I share the concern that an application might never > >>> catch > >>>>>> up > >>>>>>>>>> and thus the hot standby will never be ready. > >>>>>>>>>> > >>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If > >> we > >>>>> do > >>>>>>>>>> this, we also don't need to go with (2) and can get (3) in place > >>> for > >>>>>>>>>> future upgrades. I also think that changes to the metadata are > >> more > >>>>>>>>>> likely and thus allowing for single rolling bounce for this case > >> is > >>>>>> more > >>>>>>>>>> important anyway. If we assume that store upgrade a rare, it > >> might > >>>>> be > >>>>>> ok > >>>>>>>>>> to sacrifice two rolling bounced for this case. It was just an > >> idea > >>>>> I > >>>>>>>>>> wanted to share (even if I see the issues). > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote: > >>>>>>>>>>> Hello Matthias, thanks for your replies. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 1) About the config names: actually I was trying to not expose > >>>>>>>>>>> implementation details :) My main concern was that in your > >>> proposal > >>>>>> the > >>>>>>>>>>> values need to cover the span of all the versions that are > >>> actually > >>>>>>>> using > >>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am > >>>>>>>> upgrading > >>>>>>>>>>> from any versions within this range I need to remember to use > >> the > >>>>>> value > >>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my > >>>>>> suggestion > >>>>>>>> I > >>>>>>>>>>> was trying to argue the benefit of just letting users to > specify > >>>>> the > >>>>>>>> actual > >>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying a > >>> range > >>>>>> of > >>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the > >>>>> values, > >>>>>>>> but > >>>>>>>>>>> still using Kafka versions like broker's `internal.version` > >>> config. > >>>>>>>> But if > >>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" > you > >>>>>> meant > >>>>>>>> to > >>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or > >>>>> "1.1" > >>>>>>>> which > >>>>>>>>>>> are all recognizable config values then I think we are actually > >> on > >>>>>> the > >>>>>>>> same > >>>>>>>>>>> page. > >>>>>>>>>>> > >>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the > >>>>>> network > >>>>>>>>>>> footprint, but not the message size, IF I'm not > >> mis-understanding > >>>>>> your > >>>>>>>> idea > >>>>>>>>>>> of registering multiple assignment. More details: > >>>>>>>>>>> > >>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can encode > >>>>>> multiple > >>>>>>>>>>> protocols each with their different metadata. The coordinator > >> will > >>>>>>>> pick the > >>>>>>>>>>> common one that everyone supports (if there are no common one, > >> it > >>>>>> will > >>>>>>>> send > >>>>>>>>>>> an error back; if there are multiple ones, it will pick the one > >>>>> with > >>>>>>>> most > >>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list). > >> Since > >>>>> our > >>>>>>>>>>> current Streams rebalance protocol is still based on the > >> consumer > >>>>>>>>>>> coordinator, it means our protocol_type would be "consumer", > but > >>>>>>>> instead > >>>>>>>>>>> the protocol type we can have multiple protocols like > "streams", > >>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to > >>>>>>>> implement a > >>>>>>>>>>> different assignor class for each version and register all of > >> them > >>>>> in > >>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future > >> if > >>>>> we > >>>>>>>>>>> re-factor our implementation to have our own client coordinator > >>>>> layer > >>>>>>>> like > >>>>>>>>>>> Connect did, we can simplify this part of the implementation. > >> But > >>>>>> even > >>>>>>>> for > >>>>>>>>>>> now with the above approach this is still doable. > >>>>>>>>>>> > >>>>>>>>>>> On the broker side, the group coordinator will only persist a > >>> group > >>>>>>>> with > >>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if > >>>>>>>> coordinator > >>>>>>>>>>> decides to pick "streams_v2" it will only sends that protocol's > >>>>>>>> metadata > >>>>>>>>>>> from everyone to the leader to assign, AND when completing the > >>>>>>>> rebalance it > >>>>>>>>>>> will also only write the group metadata with that protocol and > >> the > >>>>>>>>>>> assignment only. In a word, although the network traffic maybe > >>>>>>>> increased a > >>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner > >>>>> situation > >>>>>> we > >>>>>>>>>>> need to consider is how to stop registering very old assignors > >> to > >>>>>>>> avoid the > >>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you are > >>>>> rolling > >>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1 > >> assignor > >>>>>>>> anymore, > >>>>>>>>>>> but that would unfortunately still require some configs. > >>>>>>>>>>> > >>>>>>>>>>> 3) About the "version probing" idea, I think that's a > promising > >>>>>>>> approach > >>>>>>>>>>> as well, but if we are going to do the multi-assignment its > >> value > >>>>>> seems > >>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of > >>>>>>>> multi-assignment > >>>>>>>>>>> to save us from still requiring the config to avoid registering > >>> all > >>>>>> the > >>>>>>>>>>> metadata for all version. More details: > >>>>>>>>>>> > >>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor but > >>> for > >>>>>>>> all old > >>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded data > >>>>> would > >>>>>>>> be: > >>>>>>>>>>> > >>>>>>>>>>> "streams_vN" : "encoded metadata" > >>>>>>>>>>> "streams_vN-1":empty > >>>>>>>>>>> "streams_vN-2":empty > >>>>>>>>>>> .. > >>>>>>>>>>> "streams_0":empty > >>>>>>>>>>> > >>>>>>>>>>> So the coordinator can still safely choose the latest common > >>>>> version; > >>>>>>>> and > >>>>>>>>>>> then when leaders receive the subscription (note it should > >> always > >>>>>>>> recognize > >>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the > >>>>>>>> subscriptions > >>>>>>>>>>> are empty bytes, it will send the empty assignment with that > >>>>> version > >>>>>>>> number > >>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all > >>>>> members > >>>>>>>> would > >>>>>>>>>>> send the metadata with that version: > >>>>>>>>>>> > >>>>>>>>>>> "streams_vN" : empty > >>>>>>>>>>> "streams_vN-1" : empty > >>>>>>>>>>> "streams_vN-2" : "encoded metadata" > >>>>>>>>>>> .. > >>>>>>>>>>> "streams_0":empty > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> By doing this we would not require any configs for users. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about > >>> the > >>>>>>>> details > >>>>>>>>>>> so probably we'd need to fill that out before making a call. > For > >>>>>>>> example, > >>>>>>>>>>> you mentioned "If we detect this situation, the Streams > >>> application > >>>>>>>> closes > >>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, and > >>>>>>>> re-creates > >>>>>>>>>>> the new active tasks using the new store." How could we > >> guarantee > >>>>>> that > >>>>>>>> the > >>>>>>>>>>> gap between these two stores will keep decreasing than > >> increasing > >>>>> so > >>>>>>>> we'll > >>>>>>>>>>> eventually achieve the flip point? And also the longer we are > >>>>> before > >>>>>>>> the > >>>>>>>>>>> flip point, the larger we are doubling the storage space, etc. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax < > >>>>>>>> matth...@confluent.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> @John, Guozhang, > >>>>>>>>>>>> > >>>>>>>>>>>> thanks a lot for your comments. Very long reply... > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> About upgrading the rebalance metadata: > >>>>>>>>>>>> > >>>>>>>>>>>> Another possibility to do this, would be to register multiple > >>>>>>>> assignment > >>>>>>>>>>>> strategies for the 1.2 applications. For this case, new > >> instances > >>>>>>>> would > >>>>>>>>>>>> be configured to support both and the broker would pick the > >>>>> version > >>>>>>>> that > >>>>>>>>>>>> all instances understand. The disadvantage would be, that we > >> send > >>>>>> much > >>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as > >> no > >>>>>>>> second > >>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using this > >>>>>>>> approach > >>>>>>>>>>>> would allow to avoid a second rebalance trading-off an > >> increased > >>>>>>>>>>>> rebalance network footprint (I also assume that this would > >>>>> increase > >>>>>>>> the > >>>>>>>>>>>> message size that is written into __consumer_offsets topic?). > >>>>>>>> Overall, I > >>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could > >> avoid > >>> a > >>>>>>>>>>>> second rebalance (I have some more thoughts about stores below > >>>>> that > >>>>>>>> are > >>>>>>>>>>>> relevant for single rebalance upgrade). > >>>>>>>>>>>> > >>>>>>>>>>>> For future upgrades we might be able to fix this though. I was > >>>>>>>> thinking > >>>>>>>>>>>> about the following: > >>>>>>>>>>>> > >>>>>>>>>>>> In the current implementation, the leader fails if it gets a > >>>>>>>>>>>> subscription it does not understand (ie, newer version). We > >> could > >>>>>>>> change > >>>>>>>>>>>> this behavior and let the leader send an empty assignment plus > >>>>> error > >>>>>>>>>>>> code (including supported version) back to the instance > sending > >>>>> the > >>>>>>>>>>>> "bad" subscription. This would allow the following logic for > an > >>>>>>>>>>>> application instance: > >>>>>>>>>>>> > >>>>>>>>>>>> - on startup, always send the latest subscription format > >>>>>>>>>>>> - if leader understands it, we get an assignment back an > start > >>>>>>>> processing > >>>>>>>>>>>> - if leader does not understand it, we get an empty > assignment > >>>>> and > >>>>>>>>>>>> supported version back > >>>>>>>>>>>> - the application unsubscribe()/subscribe()/poll() again and > >>>>>> sends a > >>>>>>>>>>>> subscription using the leader's supported version > >>>>>>>>>>>> > >>>>>>>>>>>> This protocol would allow to do a single rolling bounce, and > >>>>>>>> implements > >>>>>>>>>>>> a "version probing" step, that might result in two executed > >>>>>>>> rebalances. > >>>>>>>>>>>> The advantage would be, that the user does not need to set any > >>>>>> configs > >>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of this > >>>>>>>>>>>> automatically. > >>>>>>>>>>>> > >>>>>>>>>>>> One disadvantage would be, that two rebalances happen and that > >>> for > >>>>>> an > >>>>>>>>>>>> error case during rebalance, we loose the information about > the > >>>>>>>>>>>> supported leader version and the "probing step" would happen a > >>>>>> second > >>>>>>>> time. > >>>>>>>>>>>> > >>>>>>>>>>>> If the leader is eventually updated, it will include it's own > >>>>>>>> supported > >>>>>>>>>>>> version in all assignments, to allow a "down graded" > >> application > >>>>> to > >>>>>>>>>>>> upgrade its version later. Also, if a application fails, the > >>> first > >>>>>>>>>>>> probing would always be successful and only a single rebalance > >>>>>>>> happens. > >>>>>>>>>>>> If we use this protocol, I think we don't need any > >> configuration > >>>>>>>>>>>> parameter for future upgrades. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version": > >>>>>>>>>>>> > >>>>>>>>>>>> Users would set "upgrade.from" to the release version the > >>>>>> current/old > >>>>>>>>>>>> application is using. I think this is simpler, as users know > >> this > >>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we > >> expose > >>>>>>>>>>>> implementation details and users need to know the protocol > >>> version > >>>>>>>> (ie, > >>>>>>>>>>>> they need to map from the release version to the protocol > >>> version; > >>>>>> ie, > >>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2"). > >>>>>>>>>>>> > >>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the > >>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and > thus, > >>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I > will > >>>>>> update > >>>>>>>>>>>> the KIP to point out this dependency). > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> About your second point: I'll update the KIP accordingly to > >>>>> describe > >>>>>>>>>>>> future updates as well. Both will be different. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> One more point about upgrading the store format. I was > thinking > >>>>>> about > >>>>>>>>>>>> avoiding the second rolling bounce all together in the future: > >>> (1) > >>>>>> the > >>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this > >>> required > >>>>>> to > >>>>>>>>>>>> prepare the stores as "hot standbys" before we do the switch > >> and > >>>>>>>> delete > >>>>>>>>>>>> the old stores. (3) the current proposal does the switch > >>>>> "globally" > >>>>>> -- > >>>>>>>>>>>> this is simpler and due to the required second rebalance no > >>>>>>>> disadvantage. > >>>>>>>>>>>> However, a global consistent switch over might actually not be > >>>>>>>> required. > >>>>>>>>>>>> For "in_place" upgrade, following the protocol from above, we > >>>>> could > >>>>>>>>>>>> decouple the store switch and each instance could switch its > >>> store > >>>>>>>>>>>> independently from all other instances. After the rolling > >> bounce, > >>>>> it > >>>>>>>>>>>> seems to be ok to switch from the old store to the new store > >>>>> "under > >>>>>>>> the > >>>>>>>>>>>> hood" whenever the new store is ready (this could even be > done, > >>>>>> before > >>>>>>>>>>>> we switch to the new metadata version). Each time we update > the > >>>>> "hot > >>>>>>>>>>>> standby" we check if it reached the "endOffset" (or maybe X% > >>> that > >>>>>>>> could > >>>>>>>>>>>> either be hardcoded or configurable). If we detect this > >>> situation, > >>>>>> the > >>>>>>>>>>>> Streams application closes corresponding active tasks as well > >> as > >>>>>> "hot > >>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using the > >> new > >>>>>>>> store. > >>>>>>>>>>>> (I need to go through the details once again, but it seems to > >> be > >>>>>>>>>>>> feasible.). > >>>>>>>>>>>> > >>>>>>>>>>>> Combining this strategy with the "multiple assignment" idea, > >>> might > >>>>>>>> even > >>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 -> > >> 1.2. > >>>>>>>>>>>> Applications would just use the old store, as long as the new > >>>>> store > >>>>>> is > >>>>>>>>>>>> not ready, even if the new metadata version is used already. > >>>>>>>>>>>> > >>>>>>>>>>>> For future upgrades, a single rebalance would be sufficient, > >> too, > >>>>>> even > >>>>>>>>>>>> if the stores are upgraded. We would not need any config > >>>>> parameters > >>>>>> as > >>>>>>>>>>>> the "probe" step allows us to detect the supported rebalance > >>>>>> metadata > >>>>>>>>>>>> version (and we would also not need multiple "assigmnent > >>>>> strategies" > >>>>>>>> as > >>>>>>>>>>>> out own protocol encoded everything we need). > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Let me know what you think. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote: > >>>>>>>>>>>>> @John: > >>>>>>>>>>>>> > >>>>>>>>>>>>> For the protocol version upgrade, it is only for the encoded > >>>>>> metadata > >>>>>>>>>>>> bytes > >>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's > >> pov, > >>>>>> so I > >>>>>>>>>>>> think > >>>>>>>>>>>>> this change should be in the Streams layer as well. > >>>>>>>>>>>>> > >>>>>>>>>>>>> @Matthias: > >>>>>>>>>>>>> > >>>>>>>>>>>>> for 2), I agree that adding a "newest supported version" > >> besides > >>>>>> the > >>>>>>>>>>>>> "currently used version for encoding" is a good idea to allow > >>>>>> either > >>>>>>>>>>>> case; > >>>>>>>>>>>>> the key is that in Streams we would likely end up with a > >> mapping > >>>>>>>> from the > >>>>>>>>>>>>> protocol version to the other persistent data format versions > >>>>> such > >>>>>> as > >>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually > achieve > >>>>> both > >>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded > protocol > >>>>>>>> version's > >>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 -> > >> 0.10.1 > >>>>>>>> leaders > >>>>>>>>>>>>> can choose to use the newer version in the first rolling > >> bounce > >>>>>>>> directly > >>>>>>>>>>>>> and we can document to users that they would not need to set > >>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded > >>>>> protocol > >>>>>>>>>>>> version > >>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and > >> then > >>>>> we > >>>>>>>> can > >>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first > >>> rolling > >>>>>>>> bounce > >>>>>>>>>>>>> and reset in the second. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Besides that, some additional comments: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users > to > >>>>> set > >>>>>>>> than > >>>>>>>>>>>>> "internal.protocol.version" where for the latter users only > >> need > >>>>> to > >>>>>>>> set a > >>>>>>>>>>>>> single version, while the Streams will map that version to > the > >>>>>>>> Streams > >>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe I > >> did > >>>>> not > >>>>>>>> get > >>>>>>>>>>>>> your idea about how the "upgrade.from" config will be set, > >>>>> because > >>>>>>>> in > >>>>>>>>>>>>> your Compatibility section how the upgrade.from config will > be > >>>>> set > >>>>>>>> for > >>>>>>>>>>>>> these two rolling bounces are not very clear: for example, > >>> should > >>>>>>>> user > >>>>>>>>>>>>> reset it to null in the second rolling bounce? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) In the upgrade path description, rather than talking about > >>>>>>>> specific > >>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize > >> all > >>>>>> the > >>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what > >>> should > >>>>>> be > >>>>>>>> the > >>>>>>>>>>>>> standard operations? The categorized we can summarize to > would > >>> be > >>>>>>>>>>>> (assuming > >>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y are > >>> Kafka > >>>>>>>>>>>> versions, > >>>>>>>>>>>>> with the corresponding supported protocol version x and y): > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence > >> no > >>>>>>>>>>>> persistent > >>>>>>>>>>>>> data formats have changed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> b. x != y, but all persistent data format remains the same. > >>>>>>>>>>>>> > >>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB > format, > >>>>>>>> changelog > >>>>>>>>>>>>> format, has been changed. > >>>>>>>>>>>>> > >>>>>>>>>>>>> c. special case: we may need some special handling logic when > >>>>>>>> "current > >>>>>>>>>>>>> version" or "newest supported version" are not available in > >> the > >>>>>>>> protocol, > >>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> under the above scenarios, how many rolling bounces users > need > >>> to > >>>>>>>>>>>> execute? > >>>>>>>>>>>>> how they should set the configs in each rolling bounce? and > >> how > >>>>>>>> Streams > >>>>>>>>>>>>> library will execute in these cases? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax < > >>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Ted, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I still consider changing the KIP to include it right away > -- > >>> if > >>>>>>>> not, > >>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail > >>>>> first. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> (Same for other open questions like interface names -- I > >>> collect > >>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :)) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote: > >>>>>>>>>>>>>>> Thanks for the details, Matthias. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> bq. change the metadata protocol only if a future release, > >>>>>> encoding > >>>>>>>>>>>> both > >>>>>>>>>>>>>> used > >>>>>>>>>>>>>>> and supported version might be an advantage > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented > in > >>>>> this > >>>>>>>> KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax < > >>>>>>>> matth...@confluent.io > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user > >>> code. > >>>>>> And > >>>>>>>>>>>> even > >>>>>>>>>>>>>>>> if we want to add something like this, I would prefer to > do > >>> it > >>>>>> in > >>>>>>>> a > >>>>>>>>>>>>>>>> separate KIP. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But > >>>>> from > >>>>>> my > >>>>>>>>>>>>>>>> understanding it would not be possible. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to > >> switch > >>>>>> from > >>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch > >> from > >>>>> the > >>>>>>>> old > >>>>>>>>>>>>>>>> store to the new store (this happens after the last > >> instance > >>>>>>>> bounces a > >>>>>>>>>>>>>>>> second time). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that > it's > >>>>>>>> unclear > >>>>>>>>>>>> when > >>>>>>>>>>>>>>>> to which from version 2 to version 3. The > >>>>>>>> StreamsPartitionsAssignor is > >>>>>>>>>>>>>>>> stateless by design, and thus, the information which > >> version > >>>>> it > >>>>>>>> should > >>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to > use > >>>>> the > >>>>>>>>>>>>>>>> StreamsConfig to pass in this information. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> During upgrade, all new instanced have no information > about > >>>>> the > >>>>>>>>>>>> progress > >>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades > >>>>>>>> already). > >>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3 > >>>>>>>> subscription. > >>>>>>>>>>>> The > >>>>>>>>>>>>>>>> leader also has this limited view on the world and can > only > >>>>> send > >>>>>>>>>>>> version > >>>>>>>>>>>>>>>> 2 assignments back. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify > >> the > >>>>>>>> upgrade. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> We did consider to change the metadata to make later > >> upgrades > >>>>>> (ie, > >>>>>>>>>>>> from > >>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the > >>>>> metadata > >>>>>> or > >>>>>>>>>>>>>>>> storage format again -- as long as we don't change it, a > >>>>> single > >>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and > >>>>> "supported > >>>>>>>>>>>>>>>> version". This would allow the leader to switch to the new > >>>>>> version > >>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would > >> receive > >>>>>> "used > >>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as > >> long > >>>>> as > >>>>>> at > >>>>>>>>>>>> least > >>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader > sends > >>>>> old > >>>>>>>>>>>> version > >>>>>>>>>>>>>>>> assignment back. However, encoding both version would > allow > >>>>> that > >>>>>>>> the > >>>>>>>>>>>>>>>> leader can send a new version assignment back, right after > >>> the > >>>>>>>> first > >>>>>>>>>>>>>>>> round or rebalance finished (all instances send "supported > >>>>>>>> version = > >>>>>>>>>>>>>>>> new"). However, there are still two issues with this: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last > >>>>> instance > >>>>>>>>>>>> bounced, > >>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this could > >>>>> lead > >>>>>> to > >>>>>>>>>>>>>>>> "downtime" as store must be restored before processing can > >>>>>> resume. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At > this > >>>>>>>> point, the > >>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus > >>> sends > >>>>>>>> the old > >>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never > fall > >>>>> back > >>>>>>>> to > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> old protocol after the switch to the new protocol. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up the > >>>>>> instance > >>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue > >> would > >>>>>>>> prevent > >>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider > >>>>> that > >>>>>> we > >>>>>>>>>>>> might > >>>>>>>>>>>>>>>> change the metadata protocol only if a future release, > >>>>> encoding > >>>>>>>> both > >>>>>>>>>>>>>>>> used and supported version might be an advantage in the > >>> future > >>>>>>>> and we > >>>>>>>>>>>>>>>> could consider to add this information in 1.2 release to > >>>>> prepare > >>>>>>>> for > >>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the > >>>>>>>> instances > >>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you > >> would > >>>>> do > >>>>>>>> the > >>>>>>>>>>>>>>>> second rolling bounce before this, it would still work -- > >>>>>>>> however, you > >>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully > >>>>> restored > >>>>>>>>>>>> before > >>>>>>>>>>>>>>>> processing can resume. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Does this make sense? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote: > >>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of > >> the > >>>>> 2nd > >>>>>>>>>>>> rolling > >>>>>>>>>>>>>>>> bounce? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary > difference > >>>>>>>> between > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide > >>>>>>>> whether to > >>>>>>>>>>>>>> send > >>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3. > >> (Actually, > >>>>>>>> there is > >>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that the > >>> 2nd > >>>>>>>> rolling > >>>>>>>>>>>>>>>> bounce should happen after all new state stores are > created > >>> by > >>>>>> the > >>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce, > >> we > >>>>>> say > >>>>>>>> that > >>>>>>>>>>>>>>>> there is still a background thread, so it seems like is no > >>>>>> actual > >>>>>>>>>>>>>>>> requirement to wait for the new state stores to be > >> created.) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with > >>>>>> mixed-mode > >>>>>>>>>>>>>> (having > >>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group). > >> It > >>>>>> seems > >>>>>>>>>>>> like > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic > >>>>>>>> (somehow/somewhere) > >>>>>>>>>>>>>> such > >>>>>>>>>>>>>>>> that: > >>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all > >> instances > >>>>> are > >>>>>>>>>>>> running > >>>>>>>>>>>>>>>> the new code. > >>>>>>>>>>>>>>>>> * Once all the instances are running the new code, then > >> one > >>>>> at > >>>>>> a > >>>>>>>>>>>> time, > >>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader still > >>>>> hands > >>>>>>>> out > >>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are > ready. > >>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready, > >>> Leader > >>>>>>>> sends > >>>>>>>>>>>> out > >>>>>>>>>>>>>>>> Assignment Version 3. > >>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it > >> can > >>>>>>>> delete > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> old state store. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of > >>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling > >>>>> restarts. > >>>>>> No > >>>>>>>>>>>> need > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it, > >> and > >>>>>> the > >>>>>>>>>>>>>> instances > >>>>>>>>>>>>>>>> update themselves. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2 > >> rolling > >>>>>>>> bounces" > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in > >>>>>>>>>>>>>>>> inter.broker.protocol.version and > >> log.message.format.version. > >>>>>> And > >>>>>>>> in > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with some > >>>>> work > >>>>>> of > >>>>>>>>>>>>>> course) > >>>>>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection > of > >>>>>> broker > >>>>>>>>>>>>>>>> capabilities and automatically do a switchover from > old/new > >>>>>>>> versions. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -James > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck < > >>> bbej...@gmail.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Matthias, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval methods > on > >>>>> the > >>>>>>>> new > >>>>>>>>>>>>>>>>>> interfaces. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Would want to consider adding one method with a > Predicate > >>>>> that > >>>>>>>> would > >>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with the > >>>>> record? > >>>>>>>> Or > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> better left for users to implement themselves once the > >> data > >>>>>> has > >>>>>>>> been > >>>>>>>>>>>>>>>>>> retrieved? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu < > >>> yuzhih...@gmail.com > >>>>>> > >>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which > >>>>>> separator > >>>>>>>> is > >>>>>>>>>>>>>>>> chosen. > >>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice is > >>> good. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is > closer > >>> to > >>>>>>>> English > >>>>>>>>>>>>>>>>>>> grammar. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax < > >>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the comments! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> @Guozhang: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata > >>> upgrade > >>>>>> fix > >>>>>>>>>>>>>>>>>>>> (addressing the mentioned > >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It > >>>>> give a > >>>>>>>> first > >>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a > >>>>> system > >>>>>>>> test: > >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636 > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I > >> agree > >>>>>> that > >>>>>>>> the > >>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give > >> better > >>>>>>>>>>>> discussion > >>>>>>>>>>>>>>>>>>>> context. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> @Ted: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the > >>>>>>>>>>>> `processing.guarantee` > >>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and > >> `exactly_once` > >>>>> as > >>>>>>>>>>>> values. > >>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but > I > >>>>>> prefer > >>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also > >>>>> change > >>>>>>>> it to > >>>>>>>>>>>>>>>> `-`. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I > >>>>> stripped > >>>>>>>> the > >>>>>>>>>>>>>>>> `With` > >>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to > get > >>>>>>>> feedback > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> @John: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not > >>>>> make a > >>>>>>>>>>>>>>>> difference. > >>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two > version > >>>>>> might > >>>>>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit > >> an > >>>>>>>> issue > >>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of > >>>>>>>> `rocksdb-v2` > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote: > >>>>>>>>>>>>>>>>>>>>> Hey Matthias, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions > >> queued > >>>>>> up, > >>>>>>>> but > >>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>> were > >>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, > >> well. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state > directory > >>>>> from > >>>>>>>>>>>>>>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to > >>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if > >>> you > >>>>>>>> put the > >>>>>>>>>>>>>>>> "v2" > >>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e., > >>>>>>>> "/<state.dir>/< > >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), > >> then > >>>>>> you > >>>>>>>> get > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory > >>>>>> structure. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running > >>>>>>>> scripts to > >>>>>>>>>>>>>>>>>>> monitor > >>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use > >>>>> cases. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu < > >>>>>> yuzhih...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Matthias: > >>>>>>>>>>>>>>>>>>>>>> Nicely written KIP. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may > >>>>>>>> sometimes > >>>>>>>>>>>> be > >>>>>>>>>>>>>>>> miss > >>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly > to > >>>>>> user. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, > V> > >>> { > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name > for > >>>>> the > >>>>>>>>>>>> class ? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang < > >>>>>>>>>>>> wangg...@gmail.com > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it > >>>>> looks > >>>>>>>> good > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>> me, > >>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share > it > >>>>> here > >>>>>>>> so > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> people > >>>>>>>>>>>>>>>>>>>>>>> can take a look? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this > >>>>> there > >>>>>>>> are > >>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is > better > >>> to > >>>>>>>> have > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design > >>>>>>>> discussion :) > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax < > >>>>>>>>>>>>>>>>>>> matth...@confluent.io > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to > >>> allow > >>>>>>>> storing > >>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis > to > >>>>>>>> resolve > >>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >>> > >> > > > >