Thanks for the information Matthias.

I will await your completion of this ticket then since it underpins the
essential parts of a RocksDB TTL aligned with the changelog topic. I am
eager to work on that ticket myself, so if I can help on this one in any
way please let me know.

Thanks
Adam



On Tue, Nov 20, 2018 at 5:26 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> It's an interesting idea to use second store, to maintain the
> timestamps. However, each RocksDB instance implies some overhead. In
> fact, we are looking into ColumnFamilies atm to see if we can use those
> and merge multiple RocksDBs into a single one to reduce this overhead.
>
> -Matthias
>
> On 11/20/18 5:15 AM, Patrik Kleindl wrote:
> > Hi Adam
> >
> > Sounds great, I was already planning to ask around if anyone had tackled
> > this.
> > We have a use case very similar to what you described in KAFKA-4212, only
> > with Global State Stores.
> > I have tried a few things with the normal DSL but was not really
> successful.
> > Schedule/Punctuate is not possible, supplying a windowed store is also
> not
> > allowed and the process method has no knowledge of the timestamp of the
> > record.
> > And anything loaded on startup is not filtered anyway.
> >
> > Regarding 4212, wouldn't it be easier (although a little less
> > space-efficient) to track the Timestamps in a separate Store with <K,
> Long>
> > ?
> > This would leave the original store intact and allow a migration of the
> > timestamps without touching the other data.
> >
> > So I am very interested in your PR :-)
> >
> > best regards
> >
> > Patrik
> >
> > On Tue, 20 Nov 2018 at 04:46, Adam Bellemare <adam.bellem...@gmail.com>
> > wrote:
> >
> >> Hi Matthias
> >>
> >> Thanks - I figured that it was probably a case of just too much to do
> and
> >> not enough time. I know how that can go. I am asking about this one in
> >> relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a
> TTL
> >> to RocksDB. I have outlined a bit about my use-case within 4212, but for
> >> brevity here it is:
> >>
> >> My case:
> >> 1) I have a RocksDB with TTL implementation working where records are
> aged
> >> out using the TTL that comes with RocksDB (very simple).
> >> 2) We prevent records from loading from the changelog if recordTime +
> TTL <
> >> referenceTimeStamp (default = System.currentTimeInMillis() ).
> >>
> >> This assumes that the records are stored with the same time reference
> (say
> >> UTC) as the consumer materializing the RocksDB store.
> >>
> >> My questions about KIP-258 are as follows:
> >> 1) How does "we want to be able to store record timestamps in KTables"
> >> differ from inserting records into RocksDB with TTL at consumption
> time? I
> >> understand that it could be a difference of some seconds, minutes,
> hours,
> >> days etc between when the record was published and now, but given the
> >> nature of how RocksDB TTL works (eventual - based on compaction) I don't
> >> see how a precise TTL can be achieved, such as that which one can get
> with
> >> windowed stores.
> >>
> >> 2) Are you looking to change how records are inserted into a TTL
> RocksDB,
> >> such that the TTL would take effect from the record's published time? If
> >> not, what would be the ideal workflow here for a single record with TTL
> >> RocksDB?
> >> ie: Record Timestamp: 100
> >> TTL: 50
> >> Record inserted into rocksDB: 110
> >> Record to expire at 150?
> >>
> >> 3) I'm not sure I fully understand the importance of the upgrade path. I
> >> have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522
> )
> >> in
> >> the KIP, and I can understand that a state-store on disk may not
> represent
> >> what the application is expecting. I don't think I have the full picture
> >> though, because that issue seems to be easy to fix with a simple
> versioned
> >> header or accompanying file, forcing the app to rebuild the state if the
> >> version is incompatible. Can you elaborate or add a scenario to the KIP
> >> that illustrates the need for the upgrade path?
> >>
> >> Thanks,
> >>
> >> Adam
> >>
> >>
> >>
> >>
> >> On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Adam,
> >>>
> >>> I am still working on it. Was pulled into a lot of other tasks lately
> so
> >>> this was delayed. Also had some discussions about simplifying the
> >>> upgrade path with some colleagues and I am prototyping this atm. Hope
> to
> >>> update the KIP accordingly soon.
> >>>
> >>> -Matthias
> >>>
> >>> On 11/10/18 7:41 AM, Adam Bellemare wrote:
> >>>> Hello Matthias
> >>>>
> >>>> I am curious as to the status of this KIP. TTL and expiry of records
> >> will
> >>>> be extremely useful for several of our business use-cases, as well as
> >>>> another KIP I had been working on.
> >>>>
> >>>> Thanks
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Good stuff. Could you comment a bit on how future-proof is this
> >> change?
> >>> For
> >>>>> example, if we want to store both event timestamp "and" processing
> >> time
> >>> in
> >>>>> RocksDB will we then need another interface (e.g. called
> >>>>> KeyValueWithTwoTimestampsStore)?
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for your input Guozhang and John.
> >>>>>>
> >>>>>> I see your point, that the upgrade API is not simple. If you don't
> >>>>>> thinks it's valuable to make generic store upgrades possible (atm),
> >> we
> >>>>>> can make the API internal, too. The impact is, that we only support
> a
> >>>>>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> >>>>>> WindowedWithTS etc) for which we implement the internal interfaces.
> >>>>>>
> >>>>>> We can keep the design generic, so if we decide to make it public,
> we
> >>>>>> don't need to re-invent it. This will also have the advantage, that
> >> we
> >>>>>> can add upgrade pattern for other stores later, too.
> >>>>>>
> >>>>>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but
> it
> >>>>>> was the only way I could find to design a generic upgrade interface.
> >> If
> >>>>>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder`
> would
> >>>>>> become an internal interface I guess (don't think we can remove it).
> >>>>>>
> >>>>>> I will wait for more feedback about this and if nobody wants to keep
> >> it
> >>>>>> as public API I will update the KIP accordingly. Will add some more
> >>>>>> clarifications for different upgrade patterns in the mean time and
> >> fix
> >>>>>> the typos/minor issues.
> >>>>>>
> >>>>>> About adding a new state UPGRADING: maybe we could do that. However,
> >> I
> >>>>>> find it particularly difficult to make the estimation when we should
> >>>>>> switch to RUNNING, thus, I am a little hesitant. Using store
> >> callbacks
> >>>>>> or just logging the progress including some indication about the
> >> "lag"
> >>>>>> might actually be sufficient. Not sure what others think?
> >>>>>>
> >>>>>> About "value before timestamp": no real reason and I think it does
> >> not
> >>>>>> make any difference. Do you want to change it?
> >>>>>>
> >>>>>> About upgrade robustness: yes, we cannot control if an instance
> >> fails.
> >>>>>> That is what I meant by "we need to write test". The upgrade should
> >> be
> >>>>>> able to continuous even is an instance goes down (and we must make
> >> sure
> >>>>>> that we don't end up in an invalid state that forces us to wipe out
> >> the
> >>>>>> whole store). Thus, we need to write system tests that fail
> instances
> >>>>>> during upgrade.
> >>>>>>
> >>>>>> For `in_place_offline` upgrade: I don't think we need this mode,
> >>> because
> >>>>>> people can do this via a single rolling bounce.
> >>>>>>
> >>>>>>  - prepare code and switch KV-Store to KVwithTs-Store
> >>>>>>  - do a single rolling bounce (don't set any upgrade config)
> >>>>>>
> >>>>>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
> >>>>>> remove the `StoreUpgradeBuilder`) will detect that there is only an
> >> old
> >>>>>> local KV store w/o TS, will start to restore the new KVwithTs store,
> >>>>>> wipe out the old store and replace with the new store after restore
> >> is
> >>>>>> finished, and start processing only afterwards. (I guess we need to
> >>>>>> document this case -- will also add it to the KIP.)
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 8/9/18 1:10 PM, John Roesler wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> I think this KIP is looking really good.
> >>>>>>>
> >>>>>>> I have a few thoughts to add to the others:
> >>>>>>>
> >>>>>>> 1. You mentioned at one point users needing to configure
> >>>>>>> `upgrade.mode="null"`. I think this was a typo and you meant to say
> >>>>> they
> >>>>>>> should remove the config. If they really have to set it to a string
> >>>>>> "null"
> >>>>>>> or even set it to a null value but not remove it, it would be
> >>>>>> unfortunate.
> >>>>>>>
> >>>>>>> 2. In response to Bill's comment #1 , you said that "The idea is
> >> that
> >>>>> the
> >>>>>>> upgrade should be robust and not fail. We need to write according
> >>>>> tests".
> >>>>>>> I may have misunderstood the conversation, but I don't think it's
> >>>>> within
> >>>>>>> our power to say that an instance won't fail. What if one of my
> >>>>> computers
> >>>>>>> catches on fire? What if I'm deployed in the cloud and one instance
> >>>>>>> disappears and is replaced by a new one? Or what if one instance
> >> goes
> >>>>>> AWOL
> >>>>>>> for a long time and then suddenly returns? How will the upgrade
> >>> process
> >>>>>>> behave in light of such failures?
> >>>>>>>
> >>>>>>> 3. your thought about making in-place an offline mode is
> >> interesting,
> >>>>> but
> >>>>>>> it might be a bummer for on-prem users who wish to upgrade online,
> >> but
> >>>>>>> cannot just add new machines to the pool. It could be a new upgrade
> >>>>> mode
> >>>>>>> "offline-in-place", though...
> >>>>>>>
> >>>>>>> 4. I was surprised to see that a user would need to modify the
> >>> topology
> >>>>>> to
> >>>>>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
> >>>>>>> suggestions would remove this necessity.
> >>>>>>>
> >>>>>>> Thanks for taking on this very complex but necessary work.
> >>>>>>>
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Matthias,
> >>>>>>>>
> >>>>>>>> Thanks for the updated KIP. Some more comments:
> >>>>>>>>
> >>>>>>>> 1. The current set of proposed API is a bit too complicated, which
> >>>>> makes
> >>>>>>>> the upgrade flow from user's perspective also a bit complex. I'd
> >> like
> >>>>> to
> >>>>>>>> check different APIs and discuss about their needs separately:
> >>>>>>>>
> >>>>>>>>     1.a. StoreProxy: needed for in-place upgrade only, between the
> >>>>> first
> >>>>>>>> and second rolling bounce, where the old-versioned stores can
> >> handle
> >>>>>>>> new-versioned store APIs. I think such upgrade paths (i.e. from
> one
> >>>>>> store
> >>>>>>>> type to another) would not be very common: users may want to
> >> upgrade
> >>>>>> from a
> >>>>>>>> certain store engine to another, but the interface would likely be
> >>>>>> staying
> >>>>>>>> the same. Hence personally I'd suggest we keep it internally and
> >> only
> >>>>>>>> consider exposing it in the future if it does become a common
> >>> pattern.
> >>>>>>>>
> >>>>>>>>     1.b. ConverterStore / RecordConverter: needed for both
> in-place
> >>>>> and
> >>>>>>>> roll-over upgrade, between the first and second rolling bounces,
> >> for
> >>>>> the
> >>>>>>>> new versioned store to be able to read old-versioned changelog
> >>> topics.
> >>>>>>>> Firstly I think we should not expose key in the public APIs but
> >> only
> >>>>> the
> >>>>>>>> values, since allowing key format changes would break log
> >> compaction,
> >>>>>> and
> >>>>>>>> hence would not be compatible anyways. As for value format
> changes,
> >>>>>>>> personally I think we can also keep its upgrade logic internally
> as
> >>> it
> >>>>>> may
> >>>>>>>> not worth generalizing to user customizable logic.
> >>>>>>>>
> >>>>>>>>     1.c. If you agrees with 2.a/b above, then we can also remove "
> >>>>>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the
> public
> >>>>>> APIs.
> >>>>>>>>
> >>>>>>>>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore"
> is
> >>>>> not
> >>>>>>>> needed either given that we are exposing "ValueAndTimestamp"
> >> anyways.
> >>>>>> I.e.
> >>>>>>>> it is just a syntax sugar and for IQ, users can always just set a
> "
> >>>>>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as
> >> the
> >>>>>> new
> >>>>>>>> interface does not provide any additional functions.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 2. Could we further categorize the upgrade flow for different use
> >>>>> cases,
> >>>>>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> >>>>>>>> automatically for non-windowed aggregate; 2) PAPI users who do not
> >>>>> need
> >>>>>> to
> >>>>>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to
> switch
> >>> to
> >>>>>>>> KeyValueWithTimestampStore. Just to give my understanding for 3),
> >> the
> >>>>>>>> upgrade flow for users may be simplified as the following (for
> both
> >>>>>>>> in-place and roll-over):
> >>>>>>>>
> >>>>>>>>     * Update the jar to new version, make code changes from
> >>>>>> KeyValueStore
> >>>>>>>> to KeyValueWithTimestampStore, set upgrade config.
> >>>>>>>>
> >>>>>>>>     * First rolling bounce, and library code can internally use
> >> proxy
> >>>>> /
> >>>>>>>> converter based on the specified config to handle new APIs with
> old
> >>>>>> stores,
> >>>>>>>> while let new stores read from old changelog data.
> >>>>>>>>
> >>>>>>>>     * Reset upgrade config.
> >>>>>>>>
> >>>>>>>>     * Second rolling bounce, and the library code automatically
> >> turn
> >>>>> off
> >>>>>>>> logic for proxy / converter.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 3. Some more detailed proposals are needed for when to recommend
> >>> users
> >>>>>> to
> >>>>>>>> trigger the second rolling bounce. I have one idea to share here:
> >> we
> >>>>>> add a
> >>>>>>>> new state to KafkaStreams, say UPGRADING, which is set when 1)
> >>> upgrade
> >>>>>>>> config is set, and 2) the new stores are still ramping up (for the
> >>>>>> second
> >>>>>>>> part, we can start with some internal hard-coded heuristics to
> >> decide
> >>>>>> when
> >>>>>>>> it is close to be ramped up). If either one of it is not true any
> >>>>> more,
> >>>>>> it
> >>>>>>>> should transit to RUNNING. Users can then watch on this state, and
> >>>>>> decide
> >>>>>>>> to only trigger the second rebalance when the state has transited
> >>> from
> >>>>>>>> UPGRADING. They can also choose to cut over while the instance is
> >>>>> still
> >>>>>>>> UPGRADING, the downside is that after that the application may
> have
> >>>>> long
> >>>>>>>> restoration phase which is, to user's pov, unavailability periods.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Below are just some minor things on the wiki:
> >>>>>>>>
> >>>>>>>> 4. "proxy story" => "proxy store".
> >>>>>>>>
> >>>>>>>> 5. "use the a builder " => "use a builder"
> >>>>>>>>
> >>>>>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the
> >>>>> value":
> >>>>>>>> what's the rationale of putting the timestamp before the value,
> >> than
> >>>>>> after
> >>>>>>>> the value?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
> >>>>> matth...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks for the feedback Bill. I just update the KIP with some of
> >>> your
> >>>>>>>>> points.
> >>>>>>>>>
> >>>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
> >> watch
> >>>>> the
> >>>>>>>>>>> restore process), I'm wondering if we want to provide a type of
> >>>>>>>>>>> StateRestoreListener that could signal when the new stores have
> >>>>>>>> reached
> >>>>>>>>>>> parity with the existing old stores and that could be the
> signal
> >>> to
> >>>>>>>>> start
> >>>>>>>>>>> second rolling rebalance?
> >>>>>>>>>
> >>>>>>>>> I think we can reuse the existing listeners, thus, I did not
> >> include
> >>>>>>>>> anything in the KIP. About a signal to rebalance: this might be
> >>>>> tricky.
> >>>>>>>>> If we prepare the store "online", the active task will update the
> >>>>> state
> >>>>>>>>> continuously, and thus, state prepare is never finished. It will
> >> be
> >>>>> the
> >>>>>>>>> users responsibility to do the second rebalance (note, that the
> >>>>> second
> >>>>>>>>> rebalance will first finish the last delta of the upgrade to
> >> finish
> >>>>> the
> >>>>>>>>> upgrade before actual processing resumes). I clarified the KIP
> >> with
> >>>>>> this
> >>>>>>>>> regard a little bit.
> >>>>>>>>>
> >>>>>>>>>>> 1. Out of N instances, one fails midway through the process,
> >> would
> >>>>> we
> >>>>>>>>> allow
> >>>>>>>>>>> the other instances to complete or just fail the entire
> upgrade?
> >>>>>>>>>
> >>>>>>>>> The idea is that the upgrade should be robust and not fail. We
> >> need
> >>>>> to
> >>>>>>>>> write according tests.
> >>>>>>>>>
> >>>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the
> >>>>>> current
> >>>>>>>>>>> active directories vs. deleting them right away,  and when all
> >> the
> >>>>>>>>> prepare
> >>>>>>>>>>> task directories are successfully migrated then delete the
> >>> previous
> >>>>>>>>> active
> >>>>>>>>>>> ones.
> >>>>>>>>>
> >>>>>>>>> Ack. Updated the KIP.
> >>>>>>>>>
> >>>>>>>>>>> 3. For the first rolling bounce we pause any processing any new
> >>>>>>>> records
> >>>>>>>>> and
> >>>>>>>>>>> just allow the prepare tasks to restore, then once all prepare
> >>>>> tasks
> >>>>>>>>> have
> >>>>>>>>>>> restored, it's a signal for the second round of rolling bounces
> >>> and
> >>>>>>>>> then as
> >>>>>>>>>>> each task successfully renames its prepare directories and
> >> deletes
> >>>>>> the
> >>>>>>>>> old
> >>>>>>>>>>> active task directories, normal processing of records resumes.
> >>>>>>>>>
> >>>>>>>>> The basic idea is to do an online upgrade to avoid downtime. We
> >> can
> >>>>>>>>> discuss to offer both options... For the offline upgrade option,
> >> we
> >>>>>>>>> could simplify user interaction and trigger the second rebalance
> >>>>>>>>> automatically with the requirement that a user needs to update
> any
> >>>>>>>> config.
> >>>>>>>>>
> >>>>>>>>> If might actually be worth to include this option: we know from
> >>>>>>>>> experience with state restore, that regular processing slows down
> >>> the
> >>>>>>>>> restore. For roll_over upgrade, it would be a different story and
> >>>>>>>>> upgrade should not be slowed down by regular processing. Thus, we
> >>>>>> should
> >>>>>>>>> even make in_place an offline upgrade and force people to use
> >>>>> roll_over
> >>>>>>>>> if they need onlint upgrade. Might be a fair tradeoff that may
> >>>>> simplify
> >>>>>>>>> the upgrade for the user and for the code complexity.
> >>>>>>>>>
> >>>>>>>>> Let's see what other think.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> >>>>>>>>>> Hi Matthias,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the update and the working prototype, it helps with
> >>>>>>>>>> understanding the KIP.
> >>>>>>>>>>
> >>>>>>>>>> I took an initial pass over this PR, and overall I find the
> >>>>> interfaces
> >>>>>>>>> and
> >>>>>>>>>> approach to be reasonable.
> >>>>>>>>>>
> >>>>>>>>>> Regarding step 3C of the in-place upgrade (users needing to
> watch
> >>>>> the
> >>>>>>>>>> restore process), I'm wondering if we want to provide a type of
> >>>>>>>>>> StateRestoreListener that could signal when the new stores have
> >>>>>> reached
> >>>>>>>>>> parity with the existing old stores and that could be the signal
> >> to
> >>>>>>>> start
> >>>>>>>>>> second rolling rebalance?
> >>>>>>>>>>
> >>>>>>>>>> Although you solicited feedback on the interfaces involved, I
> >>> wanted
> >>>>>> to
> >>>>>>>>> put
> >>>>>>>>>> down some thoughts that have come to mind reviewing this KIP
> >> again
> >>>>>>>>>>
> >>>>>>>>>> 1. Out of N instances, one fails midway through the process,
> >> would
> >>>>> we
> >>>>>>>>> allow
> >>>>>>>>>> the other instances to complete or just fail the entire upgrade?
> >>>>>>>>>> 2. During the second rolling bounce, maybe we could rename the
> >>>>> current
> >>>>>>>>>> active directories vs. deleting them right away,  and when all
> >> the
> >>>>>>>>> prepare
> >>>>>>>>>> task directories are successfully migrated then delete the
> >> previous
> >>>>>>>>> active
> >>>>>>>>>> ones.
> >>>>>>>>>> 3. For the first rolling bounce we pause any processing any new
> >>>>>> records
> >>>>>>>>> and
> >>>>>>>>>> just allow the prepare tasks to restore, then once all prepare
> >>> tasks
> >>>>>>>> have
> >>>>>>>>>> restored, it's a signal for the second round of rolling bounces
> >> and
> >>>>>>>> then
> >>>>>>>>> as
> >>>>>>>>>> each task successfully renames its prepare directories and
> >> deletes
> >>>>> the
> >>>>>>>>> old
> >>>>>>>>>> active task directories, normal processing of records resumes.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Bill
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> >>>>>> matth...@confluent.io
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK
> 2.0
> >>>>>>>>>>> release. Thus, I want to pick up this KIP again to get the
> >> RocksDB
> >>>>>>>>>>> upgrade done for 2.1.
> >>>>>>>>>>>
> >>>>>>>>>>> I updated the KIP accordingly and also have a "prove of
> concept"
> >>> PR
> >>>>>>>>>>> ready (for "in place" upgrade only):
> >>>>>>>>>>> https://github.com/apache/kafka/pull/5422/
> >>>>>>>>>>>
> >>>>>>>>>>> There a still open questions, but I want to collect early
> >> feedback
> >>>>> on
> >>>>>>>>>>> the proposed interfaces we need for the store upgrade. Also
> >> note,
> >>>>>> that
> >>>>>>>>>>> the KIP now also aim to define a generic upgrade path from any
> >>>>> store
> >>>>>>>>>>> format A to any other store format B. Adding timestamps is just
> >> a
> >>>>>>>>>>> special case.
> >>>>>>>>>>>
> >>>>>>>>>>> I will continue to work on the PR and refine the KIP in the
> >>>>> meantime,
> >>>>>>>>> too.
> >>>>>>>>>>>
> >>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>> After some more thoughts, I want to follow John's suggestion
> >> and
> >>>>>>>> split
> >>>>>>>>>>>> upgrading the rebalance metadata from the store upgrade.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I extracted the metadata upgrade into it's own KIP:
> >>>>>>>>>>>>
> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> >>>>>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'll update this KIP accordingly shortly. I also want to
> >> consider
> >>>>> to
> >>>>>>>>>>>> make the store format upgrade more flexible/generic. Atm, the
> >> KIP
> >>>>> is
> >>>>>>>>> too
> >>>>>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI
> users
> >>>>> that
> >>>>>>>> we
> >>>>>>>>>>>> should not force to upgrade the stores. I need to figure out
> >> the
> >>>>>>>>> details
> >>>>>>>>>>>> and follow up later.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Please give feedback for the new KIP-268 on the corresponding
> >>>>>>>>> discussion
> >>>>>>>>>>>> thread.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure
> >> out
> >>>>> a
> >>>>>>>> way
> >>>>>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a
> fix
> >>>>> for
> >>>>>>>>>>>> future upgrades. Please share your thoughts.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for all your feedback!
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>>> @John: yes, we would throw if configs are missing (it's an
> >>>>>>>>>>>>> implementation details IMHO and thus I did not include it in
> >> the
> >>>>>>>> KIP)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) I understand know what you mean. We can certainly, allow
> >> all
> >>>>>>>> values
> >>>>>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
> >>>>> `upgrade.from`
> >>>>>>>>>>>>> parameter. I had a similar though once but decided to
> collapse
> >>>>> them
> >>>>>>>>> into
> >>>>>>>>>>>>> one -- will update the KIP accordingly.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) The idea to avoid any config would be, to always send both
> >>>>>>>> request.
> >>>>>>>>>>>>> If we add a config to eventually disable the old request, we
> >>>>> don't
> >>>>>>>>> gain
> >>>>>>>>>>>>> anything with this approach. The question is really, if we
> are
> >>>>>>>> willing
> >>>>>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited
> >> to
> >>>>> 2
> >>>>>>>>>>>>> versions and not grow further in future releases. More
> details
> >>> in
> >>>>>>>> (3)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and
> >> allows
> >>>>> us
> >>>>>>>> to
> >>>>>>>>>>>>> stay with 2 "assignment strategies" we need to register, as
> >> the
> >>>>> new
> >>>>>>>>>>>>> assignment strategy will allow to "upgrade itself" via
> >> "version
> >>>>>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a
> >> config
> >>>>> if
> >>>>>>>>>>>>> people upgrade from pre-1.2 releases.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thus, I don't think we need to register new "assignment
> >>>>> strategies"
> >>>>>>>>> and
> >>>>>>>>>>>>> send empty subscriptions for older version.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4) I agree that this is a tricky thing to get right with a
> >>> single
> >>>>>>>>>>>>> rebalance. I share the concern that an application might
> never
> >>>>>> catch
> >>>>>>>>> up
> >>>>>>>>>>>>> and thus the hot standby will never be ready.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades.
> >> If
> >>>>> we
> >>>>>>>> do
> >>>>>>>>>>>>> this, we also don't need to go with (2) and can get (3) in
> >> place
> >>>>>> for
> >>>>>>>>>>>>> future upgrades. I also think that changes to the metadata
> are
> >>>>> more
> >>>>>>>>>>>>> likely and thus allowing for single rolling bounce for this
> >> case
> >>>>> is
> >>>>>>>>> more
> >>>>>>>>>>>>> important anyway. If we assume that store upgrade a rare, it
> >>>>> might
> >>>>>>>> be
> >>>>>>>>> ok
> >>>>>>>>>>>>> to sacrifice two rolling bounced for this case. It was just
> an
> >>>>> idea
> >>>>>>>> I
> >>>>>>>>>>>>> wanted to share (even if I see the issues).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> >>>>>>>>>>>>>> Hello Matthias, thanks for your replies.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) About the config names: actually I was trying to not
> >> expose
> >>>>>>>>>>>>>> implementation details :) My main concern was that in your
> >>>>>> proposal
> >>>>>>>>> the
> >>>>>>>>>>>>>> values need to cover the span of all the versions that are
> >>>>>> actually
> >>>>>>>>>>> using
> >>>>>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user)
> >> am
> >>>>>>>>>>> upgrading
> >>>>>>>>>>>>>> from any versions within this range I need to remember to
> use
> >>>>> the
> >>>>>>>>> value
> >>>>>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my
> >>>>>>>>> suggestion
> >>>>>>>>>>> I
> >>>>>>>>>>>>>> was trying to argue the benefit of just letting users to
> >>> specify
> >>>>>>>> the
> >>>>>>>>>>> actual
> >>>>>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying
> a
> >>>>>> range
> >>>>>>>>> of
> >>>>>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as
> the
> >>>>>>>> values,
> >>>>>>>>>>> but
> >>>>>>>>>>>>>> still using Kafka versions like broker's `internal.version`
> >>>>>> config.
> >>>>>>>>>>> But if
> >>>>>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x"
> >>> you
> >>>>>>>>> meant
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0"
> >> or
> >>>>>>>> "1.1"
> >>>>>>>>>>> which
> >>>>>>>>>>>>>> are all recognizable config values then I think we are
> >> actually
> >>>>> on
> >>>>>>>>> the
> >>>>>>>>>>> same
> >>>>>>>>>>>>>> page.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase
> >> the
> >>>>>>>>> network
> >>>>>>>>>>>>>> footprint, but not the message size, IF I'm not
> >>>>> mis-understanding
> >>>>>>>>> your
> >>>>>>>>>>> idea
> >>>>>>>>>>>>>> of registering multiple assignment. More details:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can
> encode
> >>>>>>>>> multiple
> >>>>>>>>>>>>>> protocols each with their different metadata. The
> coordinator
> >>>>> will
> >>>>>>>>>>> pick the
> >>>>>>>>>>>>>> common one that everyone supports (if there are no common
> >> one,
> >>>>> it
> >>>>>>>>> will
> >>>>>>>>>>> send
> >>>>>>>>>>>>>> an error back; if there are multiple ones, it will pick the
> >> one
> >>>>>>>> with
> >>>>>>>>>>> most
> >>>>>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list).
> >>>>> Since
> >>>>>>>> our
> >>>>>>>>>>>>>> current Streams rebalance protocol is still based on the
> >>>>> consumer
> >>>>>>>>>>>>>> coordinator, it means our protocol_type would be "consumer",
> >>> but
> >>>>>>>>>>> instead
> >>>>>>>>>>>>>> the protocol type we can have multiple protocols like
> >>> "streams",
> >>>>>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need
> >> to
> >>>>>>>>>>> implement a
> >>>>>>>>>>>>>> different assignor class for each version and register all
> of
> >>>>> them
> >>>>>>>> in
> >>>>>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the
> >> future
> >>>>> if
> >>>>>>>> we
> >>>>>>>>>>>>>> re-factor our implementation to have our own client
> >> coordinator
> >>>>>>>> layer
> >>>>>>>>>>> like
> >>>>>>>>>>>>>> Connect did, we can simplify this part of the
> implementation.
> >>>>> But
> >>>>>>>>> even
> >>>>>>>>>>> for
> >>>>>>>>>>>>>> now with the above approach this is still doable.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On the broker side, the group coordinator will only persist
> a
> >>>>>> group
> >>>>>>>>>>> with
> >>>>>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if
> >>>>>>>>>>> coordinator
> >>>>>>>>>>>>>> decides to pick "streams_v2" it will only sends that
> >> protocol's
> >>>>>>>>>>> metadata
> >>>>>>>>>>>>>> from everyone to the leader to assign, AND when completing
> >> the
> >>>>>>>>>>> rebalance it
> >>>>>>>>>>>>>> will also only write the group metadata with that protocol
> >> and
> >>>>> the
> >>>>>>>>>>>>>> assignment only. In a word, although the network traffic
> >> maybe
> >>>>>>>>>>> increased a
> >>>>>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner
> >>>>>>>> situation
> >>>>>>>>> we
> >>>>>>>>>>>>>> need to consider is how to stop registering very old
> >> assignors
> >>>>> to
> >>>>>>>>>>> avoid the
> >>>>>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you
> are
> >>>>>>>> rolling
> >>>>>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1
> >>>>> assignor
> >>>>>>>>>>> anymore,
> >>>>>>>>>>>>>> but that would unfortunately still require some configs.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
> >>> promising
> >>>>>>>>>>> approach
> >>>>>>>>>>>>>> as well, but if we are going to do the multi-assignment its
> >>>>> value
> >>>>>>>>> seems
> >>>>>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
> >>>>>>>>>>> multi-assignment
> >>>>>>>>>>>>>> to save us from still requiring the config to avoid
> >> registering
> >>>>>> all
> >>>>>>>>> the
> >>>>>>>>>>>>>> metadata for all version. More details:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor
> >> but
> >>>>>> for
> >>>>>>>>>>> all old
> >>>>>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded
> >> data
> >>>>>>>> would
> >>>>>>>>>>> be:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "streams_vN" : "encoded metadata"
> >>>>>>>>>>>>>> "streams_vN-1":empty
> >>>>>>>>>>>>>> "streams_vN-2":empty
> >>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So the coordinator can still safely choose the latest common
> >>>>>>>> version;
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> then when leaders receive the subscription (note it should
> >>>>> always
> >>>>>>>>>>> recognize
> >>>>>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the
> >>>>>>>>>>> subscriptions
> >>>>>>>>>>>>>> are empty bytes, it will send the empty assignment with that
> >>>>>>>> version
> >>>>>>>>>>> number
> >>>>>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all
> >>>>>>>> members
> >>>>>>>>>>> would
> >>>>>>>>>>>>>> send the metadata with that version:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "streams_vN" : empty
> >>>>>>>>>>>>>> "streams_vN-1" : empty
> >>>>>>>>>>>>>> "streams_vN-2" : "encoded metadata"
> >>>>>>>>>>>>>> ..
> >>>>>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> By doing this we would not require any configs for users.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear
> >> about
> >>>>>> the
> >>>>>>>>>>> details
> >>>>>>>>>>>>>> so probably we'd need to fill that out before making a call.
> >>> For
> >>>>>>>>>>> example,
> >>>>>>>>>>>>>> you mentioned "If we detect this situation, the Streams
> >>>>>> application
> >>>>>>>>>>> closes
> >>>>>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks,
> >> and
> >>>>>>>>>>> re-creates
> >>>>>>>>>>>>>> the new active tasks using the new store." How could we
> >>>>> guarantee
> >>>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> gap between these two stores will keep decreasing than
> >>>>> increasing
> >>>>>>>> so
> >>>>>>>>>>> we'll
> >>>>>>>>>>>>>> eventually achieve the flip point? And also the longer we
> are
> >>>>>>>> before
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> flip point, the larger we are doubling the storage space,
> >> etc.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> >>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> @John, Guozhang,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> thanks a lot for your comments. Very long reply...
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> About upgrading the rebalance metadata:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Another possibility to do this, would be to register
> >> multiple
> >>>>>>>>>>> assignment
> >>>>>>>>>>>>>>> strategies for the 1.2 applications. For this case, new
> >>>>> instances
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>> be configured to support both and the broker would pick the
> >>>>>>>> version
> >>>>>>>>>>> that
> >>>>>>>>>>>>>>> all instances understand. The disadvantage would be, that
> we
> >>>>> send
> >>>>>>>>> much
> >>>>>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long
> >> as
> >>>>> no
> >>>>>>>>>>> second
> >>>>>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using
> >> this
> >>>>>>>>>>> approach
> >>>>>>>>>>>>>>> would allow to avoid a second rebalance trading-off an
> >>>>> increased
> >>>>>>>>>>>>>>> rebalance network footprint (I also assume that this would
> >>>>>>>> increase
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> message size that is written into __consumer_offsets
> >> topic?).
> >>>>>>>>>>> Overall, I
> >>>>>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could
> >>>>> avoid
> >>>>>> a
> >>>>>>>>>>>>>>> second rebalance (I have some more thoughts about stores
> >> below
> >>>>>>>> that
> >>>>>>>>>>> are
> >>>>>>>>>>>>>>> relevant for single rebalance upgrade).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For future upgrades we might be able to fix this though. I
> >> was
> >>>>>>>>>>> thinking
> >>>>>>>>>>>>>>> about the following:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In the current implementation, the leader fails if it gets
> a
> >>>>>>>>>>>>>>> subscription it does not understand (ie, newer version). We
> >>>>> could
> >>>>>>>>>>> change
> >>>>>>>>>>>>>>> this behavior and let the leader send an empty assignment
> >> plus
> >>>>>>>> error
> >>>>>>>>>>>>>>> code (including supported version) back to the instance
> >>> sending
> >>>>>>>> the
> >>>>>>>>>>>>>>> "bad" subscription. This would allow the following logic
> for
> >>> an
> >>>>>>>>>>>>>>> application instance:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>  - on startup, always send the latest subscription format
> >>>>>>>>>>>>>>>  - if leader understands it, we get an assignment back an
> >>> start
> >>>>>>>>>>> processing
> >>>>>>>>>>>>>>>  - if leader does not understand it, we get an empty
> >>> assignment
> >>>>>>>> and
> >>>>>>>>>>>>>>> supported version back
> >>>>>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again
> >> and
> >>>>>>>>> sends a
> >>>>>>>>>>>>>>> subscription using the leader's supported version
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This protocol would allow to do a single rolling bounce,
> and
> >>>>>>>>>>> implements
> >>>>>>>>>>>>>>> a "version probing" step, that might result in two executed
> >>>>>>>>>>> rebalances.
> >>>>>>>>>>>>>>> The advantage would be, that the user does not need to set
> >> any
> >>>>>>>>> configs
> >>>>>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of
> >> this
> >>>>>>>>>>>>>>> automatically.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> One disadvantage would be, that two rebalances happen and
> >> that
> >>>>>> for
> >>>>>>>>> an
> >>>>>>>>>>>>>>> error case during rebalance, we loose the information about
> >>> the
> >>>>>>>>>>>>>>> supported leader version and the "probing step" would
> >> happen a
> >>>>>>>>> second
> >>>>>>>>>>> time.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If the leader is eventually updated, it will include it's
> >> own
> >>>>>>>>>>> supported
> >>>>>>>>>>>>>>> version in all assignments, to allow a "down graded"
> >>>>> application
> >>>>>>>> to
> >>>>>>>>>>>>>>> upgrade its version later. Also, if a application fails,
> the
> >>>>>> first
> >>>>>>>>>>>>>>> probing would always be successful and only a single
> >> rebalance
> >>>>>>>>>>> happens.
> >>>>>>>>>>>>>>> If we use this protocol, I think we don't need any
> >>>>> configuration
> >>>>>>>>>>>>>>> parameter for future upgrades.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Users would set "upgrade.from" to the release version the
> >>>>>>>>> current/old
> >>>>>>>>>>>>>>> application is using. I think this is simpler, as users
> know
> >>>>> this
> >>>>>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we
> >>>>> expose
> >>>>>>>>>>>>>>> implementation details and users need to know the protocol
> >>>>>> version
> >>>>>>>>>>> (ie,
> >>>>>>>>>>>>>>> they need to map from the release version to the protocol
> >>>>>> version;
> >>>>>>>>> ie,
> >>>>>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version
> >> 2").
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the
> >>>>>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and
> >>> thus,
> >>>>>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I
> >>> will
> >>>>>>>>> update
> >>>>>>>>>>>>>>> the KIP to point out this dependency).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> About your second point: I'll update the KIP accordingly to
> >>>>>>>> describe
> >>>>>>>>>>>>>>> future updates as well. Both will be different.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> One more point about upgrading the store format. I was
> >>> thinking
> >>>>>>>>> about
> >>>>>>>>>>>>>>> avoiding the second rolling bounce all together in the
> >> future:
> >>>>>> (1)
> >>>>>>>>> the
> >>>>>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
> >>>>>> required
> >>>>>>>>> to
> >>>>>>>>>>>>>>> prepare the stores as "hot standbys" before we do the
> switch
> >>>>> and
> >>>>>>>>>>> delete
> >>>>>>>>>>>>>>> the old stores. (3) the current proposal does the switch
> >>>>>>>> "globally"
> >>>>>>>>> --
> >>>>>>>>>>>>>>> this is simpler and due to the required second rebalance no
> >>>>>>>>>>> disadvantage.
> >>>>>>>>>>>>>>> However, a global consistent switch over might actually not
> >> be
> >>>>>>>>>>> required.
> >>>>>>>>>>>>>>> For "in_place" upgrade, following the protocol from above,
> >> we
> >>>>>>>> could
> >>>>>>>>>>>>>>> decouple the store switch and each instance could switch
> its
> >>>>>> store
> >>>>>>>>>>>>>>> independently from all other instances. After the rolling
> >>>>> bounce,
> >>>>>>>> it
> >>>>>>>>>>>>>>> seems to be ok to switch from the old store to the new
> store
> >>>>>>>> "under
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> hood" whenever the new store is ready (this could even be
> >>> done,
> >>>>>>>>> before
> >>>>>>>>>>>>>>> we switch to the new metadata version). Each time we update
> >>> the
> >>>>>>>> "hot
> >>>>>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or maybe
> >> X%
> >>>>>> that
> >>>>>>>>>>> could
> >>>>>>>>>>>>>>> either be hardcoded or configurable). If we detect this
> >>>>>> situation,
> >>>>>>>>> the
> >>>>>>>>>>>>>>> Streams application closes corresponding active tasks as
> >> well
> >>>>> as
> >>>>>>>>> "hot
> >>>>>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using
> >> the
> >>>>> new
> >>>>>>>>>>> store.
> >>>>>>>>>>>>>>> (I need to go through the details once again, but it seems
> >> to
> >>>>> be
> >>>>>>>>>>>>>>> feasible.).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Combining this strategy with the "multiple assignment"
> idea,
> >>>>>> might
> >>>>>>>>>>> even
> >>>>>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1
> ->
> >>>>> 1.2.
> >>>>>>>>>>>>>>> Applications would just use the old store, as long as the
> >> new
> >>>>>>>> store
> >>>>>>>>> is
> >>>>>>>>>>>>>>> not ready, even if the new metadata version is used
> already.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For future upgrades, a single rebalance would be
> sufficient,
> >>>>> too,
> >>>>>>>>> even
> >>>>>>>>>>>>>>> if the stores are upgraded. We would not need any config
> >>>>>>>> parameters
> >>>>>>>>> as
> >>>>>>>>>>>>>>> the "probe" step allows us to detect the supported
> rebalance
> >>>>>>>>> metadata
> >>>>>>>>>>>>>>> version (and we would also not need multiple "assigmnent
> >>>>>>>> strategies"
> >>>>>>>>>>> as
> >>>>>>>>>>>>>>> out own protocol encoded everything we need).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For the protocol version upgrade, it is only for the
> >> encoded
> >>>>>>>>> metadata
> >>>>>>>>>>>>>>> bytes
> >>>>>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from
> Consumer's
> >>>>> pov,
> >>>>>>>>> so I
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>> this change should be in the Streams layer as well.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @Matthias:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> for 2), I agree that adding a "newest supported version"
> >>>>> besides
> >>>>>>>>> the
> >>>>>>>>>>>>>>>> "currently used version for encoding" is a good idea to
> >> allow
> >>>>>>>>> either
> >>>>>>>>>>>>>>> case;
> >>>>>>>>>>>>>>>> the key is that in Streams we would likely end up with a
> >>>>> mapping
> >>>>>>>>>>> from the
> >>>>>>>>>>>>>>>> protocol version to the other persistent data format
> >> versions
> >>>>>>>> such
> >>>>>>>>> as
> >>>>>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually
> >>> achieve
> >>>>>>>> both
> >>>>>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded
> >>> protocol
> >>>>>>>>>>> version's
> >>>>>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 ->
> >>>>> 0.10.1
> >>>>>>>>>>> leaders
> >>>>>>>>>>>>>>>> can choose to use the newer version in the first rolling
> >>>>> bounce
> >>>>>>>>>>> directly
> >>>>>>>>>>>>>>>> and we can document to users that they would not need to
> >> set
> >>>>>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded
> >>>>>>>> protocol
> >>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2,
> and
> >>>>> then
> >>>>>>>> we
> >>>>>>>>>>> can
> >>>>>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first
> >>>>>> rolling
> >>>>>>>>>>> bounce
> >>>>>>>>>>>>>>>> and reset in the second.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Besides that, some additional comments:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for
> users
> >>> to
> >>>>>>>> set
> >>>>>>>>>>> than
> >>>>>>>>>>>>>>>> "internal.protocol.version" where for the latter users
> only
> >>>>> need
> >>>>>>>> to
> >>>>>>>>>>> set a
> >>>>>>>>>>>>>>>> single version, while the Streams will map that version to
> >>> the
> >>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe
> I
> >>>>> did
> >>>>>>>> not
> >>>>>>>>>>> get
> >>>>>>>>>>>>>>>> your idea about how the  "upgrade.from" config will be
> set,
> >>>>>>>> because
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>>> your Compatibility section how the upgrade.from config
> will
> >>> be
> >>>>>>>> set
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>> these two rolling bounces are not very clear: for example,
> >>>>>> should
> >>>>>>>>>>> user
> >>>>>>>>>>>>>>>> reset it to null in the second rolling bounce?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2) In the upgrade path description, rather than talking
> >> about
> >>>>>>>>>>> specific
> >>>>>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just
> >> categorize
> >>>>> all
> >>>>>>>>> the
> >>>>>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what
> >>>>>> should
> >>>>>>>>> be
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> standard operations? The categorized we can summarize to
> >>> would
> >>>>>> be
> >>>>>>>>>>>>>>> (assuming
> >>>>>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y
> are
> >>>>>> Kafka
> >>>>>>>>>>>>>>> versions,
> >>>>>>>>>>>>>>>> with the corresponding supported protocol version x and
> y):
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and
> >> hence
> >>>>> no
> >>>>>>>>>>>>>>> persistent
> >>>>>>>>>>>>>>>> data formats have changed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> b. x != y, but all persistent data format remains the
> same.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB
> >>> format,
> >>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>> format, has been changed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> c. special case: we may need some special handling logic
> >> when
> >>>>>>>>>>> "current
> >>>>>>>>>>>>>>>> version" or "newest supported version" are not available
> in
> >>>>> the
> >>>>>>>>>>> protocol,
> >>>>>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> under the above scenarios, how many rolling bounces users
> >>> need
> >>>>>> to
> >>>>>>>>>>>>>>> execute?
> >>>>>>>>>>>>>>>> how they should set the configs in each rolling bounce?
> and
> >>>>> how
> >>>>>>>>>>> Streams
> >>>>>>>>>>>>>>>> library will execute in these cases?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> >>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Ted,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I still consider changing the KIP to include it right
> away
> >>> --
> >>>>>> if
> >>>>>>>>>>> not,
> >>>>>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more
> >> detail
> >>>>>>>> first.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> (Same for other open questions like interface names -- I
> >>>>>> collect
> >>>>>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :))
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> >>>>>>>>>>>>>>>>>> Thanks for the details, Matthias.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> bq. change the metadata protocol only if a future
> >> release,
> >>>>>>>>> encoding
> >>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>> and supported version might be an advantage
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be
> implemented
> >>> in
> >>>>>>>> this
> >>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding
> >> proposal.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> >>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of
> user
> >>>>>> code.
> >>>>>>>>> And
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>>> if we want to add something like this, I would prefer
> to
> >>> do
> >>>>>> it
> >>>>>>>>> in
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> separate KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce.
> >> But
> >>>>>>>> from
> >>>>>>>>> my
> >>>>>>>>>>>>>>>>>>> understanding it would not be possible.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to
> >>>>> switch
> >>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch
> >>>>> from
> >>>>>>>> the
> >>>>>>>>>>> old
> >>>>>>>>>>>>>>>>>>> store to the new store (this happens after the last
> >>>>> instance
> >>>>>>>>>>> bounces a
> >>>>>>>>>>>>>>>>>>> second time).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that
> >>> it's
> >>>>>>>>>>> unclear
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> to which from version 2 to version 3. The
> >>>>>>>>>>> StreamsPartitionsAssignor is
> >>>>>>>>>>>>>>>>>>> stateless by design, and thus, the information which
> >>>>> version
> >>>>>>>> it
> >>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to
> >>> use
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> During upgrade, all new instanced have no information
> >>> about
> >>>>>>>> the
> >>>>>>>>>>>>>>> progress
> >>>>>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got
> >> upgrades
> >>>>>>>>>>> already).
> >>>>>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
> >>>>>>>>>>> subscription.
> >>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>> leader also has this limited view on the world and can
> >>> only
> >>>>>>>> send
> >>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>> 2 assignments back.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can
> simplify
> >>>>> the
> >>>>>>>>>>> upgrade.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> We did consider to change the metadata to make later
> >>>>> upgrades
> >>>>>>>>> (ie,
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the
> >>>>>>>> metadata
> >>>>>>>>> or
> >>>>>>>>>>>>>>>>>>> storage format again -- as long as we don't change it,
> a
> >>>>>>>> single
> >>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
> >>>>>>>> "supported
> >>>>>>>>>>>>>>>>>>> version". This would allow the leader to switch to the
> >> new
> >>>>>>>>> version
> >>>>>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would
> >>>>> receive
> >>>>>>>>> "used
> >>>>>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as
> >>>>> long
> >>>>>>>> as
> >>>>>>>>> at
> >>>>>>>>>>>>>>> least
> >>>>>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader
> >>> sends
> >>>>>>>> old
> >>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>> assignment back. However, encoding both version would
> >>> allow
> >>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> leader can send a new version assignment back, right
> >> after
> >>>>>> the
> >>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>> round or rebalance finished (all instances send
> >> "supported
> >>>>>>>>>>> version =
> >>>>>>>>>>>>>>>>>>> new"). However, there are still two issues with this:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last
> >>>>>>>> instance
> >>>>>>>>>>>>>>> bounced,
> >>>>>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this
> >> could
> >>>>>>>> lead
> >>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> "downtime" as store must be restored before processing
> >> can
> >>>>>>>>> resume.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At
> >>> this
> >>>>>>>>>>> point, the
> >>>>>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and
> thus
> >>>>>> sends
> >>>>>>>>>>> the old
> >>>>>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never
> >>> fall
> >>>>>>>> back
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> old protocol after the switch to the new protocol.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up
> >> the
> >>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue
> >>>>> would
> >>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we
> >> consider
> >>>>>>>> that
> >>>>>>>>> we
> >>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>> change the metadata protocol only if a future release,
> >>>>>>>> encoding
> >>>>>>>>>>> both
> >>>>>>>>>>>>>>>>>>> used and supported version might be an advantage in the
> >>>>>> future
> >>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>> could consider to add this information in 1.2 release
> to
> >>>>>>>> prepare
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give
> >> the
> >>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you
> >>>>> would
> >>>>>>>> do
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> second rolling bounce before this, it would still work
> >> --
> >>>>>>>>>>> however, you
> >>>>>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully
> >>>>>>>> restored
> >>>>>>>>>>>>>>> before
> >>>>>>>>>>>>>>>>>>> processing can resume.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> >>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid
> of
> >>>>> the
> >>>>>>>> 2nd
> >>>>>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>> bounce?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
> >>> difference
> >>>>>>>>>>> between
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to
> >> decide
> >>>>>>>>>>> whether to
> >>>>>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
> >>>>> (Actually,
> >>>>>>>>>>> there is
> >>>>>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that
> >> the
> >>>>>> 2nd
> >>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>>>>> bounce should happen after all new state stores are
> >>> created
> >>>>>> by
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling
> >> bounce,
> >>>>> we
> >>>>>>>>> say
> >>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> there is still a background thread, so it seems like is
> >> no
> >>>>>>>>> actual
> >>>>>>>>>>>>>>>>>>> requirement to wait for the new state stores to be
> >>>>> created.)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with
> >>>>>>>>> mixed-mode
> >>>>>>>>>>>>>>>>> (having
> >>>>>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer
> >> group).
> >>>>> It
> >>>>>>>>> seems
> >>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> >>>>>>>>>>> (somehow/somewhere)
> >>>>>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>>>>> that:
> >>>>>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
> >>>>> instances
> >>>>>>>> are
> >>>>>>>>>>>>>>> running
> >>>>>>>>>>>>>>>>>>> the new code.
> >>>>>>>>>>>>>>>>>>>> * Once all the instances are running the new code,
> then
> >>>>> one
> >>>>>>>> at
> >>>>>>>>> a
> >>>>>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader
> >> still
> >>>>>>>> hands
> >>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are
> >>> ready.
> >>>>>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready,
> >>>>>> Leader
> >>>>>>>>>>> sends
> >>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>> Assignment Version 3.
> >>>>>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3,
> it
> >>>>> can
> >>>>>>>>>>> delete
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> old state store.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of
> >>>>>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
> >>>>>>>> restarts.
> >>>>>>>>> No
> >>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy
> >> it,
> >>>>> and
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>>>>> update themselves.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2
> >>>>> rolling
> >>>>>>>>>>> bounces"
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
> >>>>>>>>>>>>>>>>>>> inter.broker.protocol.version and
> >>>>> log.message.format.version.
> >>>>>>>>> And
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with
> >> some
> >>>>>>>> work
> >>>>>>>>> of
> >>>>>>>>>>>>>>>>> course)
> >>>>>>>>>>>>>>>>>>> to modify kafka to allow us to do similar
> auto-detection
> >>> of
> >>>>>>>>> broker
> >>>>>>>>>>>>>>>>>>> capabilities and automatically do a switchover from
> >>> old/new
> >>>>>>>>>>> versions.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> -James
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> >>>>>> bbej...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval
> methods
> >>> on
> >>>>>>>> the
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
> >>> Predicate
> >>>>>>>> that
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with
> the
> >>>>>>>> record?
> >>>>>>>>>>> Or
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> better left for users to implement themselves once
> the
> >>>>> data
> >>>>>>>>> has
> >>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>> retrieved?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> >>>>>> yuzhih...@gmail.com
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which
> >>>>>>>>> separator
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> chosen.
> >>>>>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice
> is
> >>>>>> good.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is
> >>> closer
> >>>>>> to
> >>>>>>>>>>> English
> >>>>>>>>>>>>>>>>>>>>>> grammar.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the comments!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata
> >>>>>> upgrade
> >>>>>>>>> fix
> >>>>>>>>>>>>>>>>>>>>>>> (addressing the mentioned
> >>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054)
> >> It
> >>>>>>>> give a
> >>>>>>>>>>> first
> >>>>>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including
> >> a
> >>>>>>>> system
> >>>>>>>>>>> test:
> >>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I
> >>>>> agree
> >>>>>>>>> that
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give
> >>>>> better
> >>>>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>>>>> context.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> @Ted:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> >>>>>>>>>>>>>>> `processing.guarantee`
> >>>>>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
> >>>>> `exactly_once`
> >>>>>>>> as
> >>>>>>>>>>>>>>> values.
> >>>>>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash
> >> but
> >>> I
> >>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can
> >> also
> >>>>>>>> change
> >>>>>>>>>>> it to
> >>>>>>>>>>>>>>>>>>> `-`.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I
> >>>>>>>> stripped
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> `With`
> >>>>>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to
> >>> get
> >>>>>>>>>>> feedback
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would
> >> not
> >>>>>>>> make a
> >>>>>>>>>>>>>>>>>>> difference.
> >>>>>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two
> >>> version
> >>>>>>>>> might
> >>>>>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't
> >> hit
> >>>>> an
> >>>>>>>>>>> issue
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of
> >>>>>>>>>>> `rocksdb-v2`
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> >>>>>>>>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions
> >>>>> queued
> >>>>>>>>> up,
> >>>>>>>>>>> but
> >>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh,
> >>>>> well.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
> >>> directory
> >>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to
> >>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/":
> >> if
> >>>>>> you
> >>>>>>>>>>> put the
> >>>>>>>>>>>>>>>>>>> "v2"
> >>>>>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
> >>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>>>>> application.id>/<task.id
> >/rocksdb/storeName-v2/"),
> >>>>> then
> >>>>>>>>> you
> >>>>>>>>>>> get
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory
> >>>>>>>>> structure.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people
> >> running
> >>>>>>>>>>> scripts to
> >>>>>>>>>>>>>>>>>>>>>> monitor
> >>>>>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such
> use
> >>>>>>>> cases.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> >>>>>>>>> yuzhih...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore
> >> may
> >>>>>>>>>>> sometimes
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> miss
> >>>>>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more
> friendly
> >>> to
> >>>>>>>>> user.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> public interface
> ReadOnlyKeyValueTimestampStore<K,
> >>> V>
> >>>>>> {
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name
> >>> for
> >>>>>>>> the
> >>>>>>>>>>>>>>> class ?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
> >>>>>>>>>>>>>>> wangg...@gmail.com
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and
> >> it
> >>>>>>>> looks
> >>>>>>>>>>> good
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also
> share
> >>> it
> >>>>>>>> here
> >>>>>>>>>>> so
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>>>>>> can take a look?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like
> >> this
> >>>>>>>> there
> >>>>>>>>>>> are
> >>>>>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is
> >>> better
> >>>>>> to
> >>>>>>>>>>> have
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design
> >>>>>>>>>>> discussion :)
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax
> <
> >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API
> to
> >>>>>> allow
> >>>>>>>>>>> storing
> >>>>>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the
> basis
> >>> to
> >>>>>>>>>>> resolve
> >>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to