Hi Matthias

Thanks - I figured that it was probably a case of just too much to do and
not enough time. I know how that can go. I am asking about this one in
relation to https://issues.apache.org/jira/browse/KAFKA-4212, adding a TTL
to RocksDB. I have outlined a bit about my use-case within 4212, but for
brevity here it is:

My case:
1) I have a RocksDB with TTL implementation working where records are aged
out using the TTL that comes with RocksDB (very simple).
2) We prevent records from loading from the changelog if recordTime + TTL <
referenceTimeStamp (default = System.currentTimeInMillis() ).

This assumes that the records are stored with the same time reference (say
UTC) as the consumer materializing the RocksDB store.

My questions about KIP-258 are as follows:
1) How does "we want to be able to store record timestamps in KTables"
differ from inserting records into RocksDB with TTL at consumption time? I
understand that it could be a difference of some seconds, minutes, hours,
days etc between when the record was published and now, but given the
nature of how RocksDB TTL works (eventual - based on compaction) I don't
see how a precise TTL can be achieved, such as that which one can get with
windowed stores.

2) Are you looking to change how records are inserted into a TTL RocksDB,
such that the TTL would take effect from the record's published time? If
not, what would be the ideal workflow here for a single record with TTL
RocksDB?
ie: Record Timestamp: 100
TTL: 50
Record inserted into rocksDB: 110
Record to expire at 150?

3) I'm not sure I fully understand the importance of the upgrade path. I
have read the link to (https://issues.apache.org/jira/browse/KAFKA-3522) in
the KIP, and I can understand that a state-store on disk may not represent
what the application is expecting. I don't think I have the full picture
though, because that issue seems to be easy to fix with a simple versioned
header or accompanying file, forcing the app to rebuild the state if the
version is incompatible. Can you elaborate or add a scenario to the KIP
that illustrates the need for the upgrade path?

Thanks,

Adam




On Sun, Nov 11, 2018 at 1:43 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Adam,
>
> I am still working on it. Was pulled into a lot of other tasks lately so
> this was delayed. Also had some discussions about simplifying the
> upgrade path with some colleagues and I am prototyping this atm. Hope to
> update the KIP accordingly soon.
>
> -Matthias
>
> On 11/10/18 7:41 AM, Adam Bellemare wrote:
> > Hello Matthias
> >
> > I am curious as to the status of this KIP. TTL and expiry of records will
> > be extremely useful for several of our business use-cases, as well as
> > another KIP I had been working on.
> >
> > Thanks
> >
> >
> >
> > On Mon, Aug 13, 2018 at 10:29 AM Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Matthias,
> >>
> >> Good stuff. Could you comment a bit on how future-proof is this change?
> For
> >> example, if we want to store both event timestamp "and" processing time
> in
> >> RocksDB will we then need another interface (e.g. called
> >> KeyValueWithTwoTimestampsStore)?
> >>
> >> Thanks
> >> Eno
> >>
> >> On Thu, Aug 9, 2018 at 2:30 PM, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for your input Guozhang and John.
> >>>
> >>> I see your point, that the upgrade API is not simple. If you don't
> >>> thinks it's valuable to make generic store upgrades possible (atm), we
> >>> can make the API internal, too. The impact is, that we only support a
> >>> predefined set up upgrades (ie, KV to KVwithTs, Windowed to
> >>> WindowedWithTS etc) for which we implement the internal interfaces.
> >>>
> >>> We can keep the design generic, so if we decide to make it public, we
> >>> don't need to re-invent it. This will also have the advantage, that we
> >>> can add upgrade pattern for other stores later, too.
> >>>
> >>> I also agree, that the `StoreUpgradeBuilder` is a little ugly, but it
> >>> was the only way I could find to design a generic upgrade interface. If
> >>> we decide the hide all the upgrade stuff, `StoreUpgradeBuilder` would
> >>> become an internal interface I guess (don't think we can remove it).
> >>>
> >>> I will wait for more feedback about this and if nobody wants to keep it
> >>> as public API I will update the KIP accordingly. Will add some more
> >>> clarifications for different upgrade patterns in the mean time and fix
> >>> the typos/minor issues.
> >>>
> >>> About adding a new state UPGRADING: maybe we could do that. However, I
> >>> find it particularly difficult to make the estimation when we should
> >>> switch to RUNNING, thus, I am a little hesitant. Using store callbacks
> >>> or just logging the progress including some indication about the "lag"
> >>> might actually be sufficient. Not sure what others think?
> >>>
> >>> About "value before timestamp": no real reason and I think it does not
> >>> make any difference. Do you want to change it?
> >>>
> >>> About upgrade robustness: yes, we cannot control if an instance fails.
> >>> That is what I meant by "we need to write test". The upgrade should be
> >>> able to continuous even is an instance goes down (and we must make sure
> >>> that we don't end up in an invalid state that forces us to wipe out the
> >>> whole store). Thus, we need to write system tests that fail instances
> >>> during upgrade.
> >>>
> >>> For `in_place_offline` upgrade: I don't think we need this mode,
> because
> >>> people can do this via a single rolling bounce.
> >>>
> >>>  - prepare code and switch KV-Store to KVwithTs-Store
> >>>  - do a single rolling bounce (don't set any upgrade config)
> >>>
> >>> For this case, the `StoreUpgradeBuilder` (or `KVwithTs-Store` if we
> >>> remove the `StoreUpgradeBuilder`) will detect that there is only an old
> >>> local KV store w/o TS, will start to restore the new KVwithTs store,
> >>> wipe out the old store and replace with the new store after restore is
> >>> finished, and start processing only afterwards. (I guess we need to
> >>> document this case -- will also add it to the KIP.)
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 8/9/18 1:10 PM, John Roesler wrote:
> >>>> Hi Matthias,
> >>>>
> >>>> I think this KIP is looking really good.
> >>>>
> >>>> I have a few thoughts to add to the others:
> >>>>
> >>>> 1. You mentioned at one point users needing to configure
> >>>> `upgrade.mode="null"`. I think this was a typo and you meant to say
> >> they
> >>>> should remove the config. If they really have to set it to a string
> >>> "null"
> >>>> or even set it to a null value but not remove it, it would be
> >>> unfortunate.
> >>>>
> >>>> 2. In response to Bill's comment #1 , you said that "The idea is that
> >> the
> >>>> upgrade should be robust and not fail. We need to write according
> >> tests".
> >>>> I may have misunderstood the conversation, but I don't think it's
> >> within
> >>>> our power to say that an instance won't fail. What if one of my
> >> computers
> >>>> catches on fire? What if I'm deployed in the cloud and one instance
> >>>> disappears and is replaced by a new one? Or what if one instance goes
> >>> AWOL
> >>>> for a long time and then suddenly returns? How will the upgrade
> process
> >>>> behave in light of such failures?
> >>>>
> >>>> 3. your thought about making in-place an offline mode is interesting,
> >> but
> >>>> it might be a bummer for on-prem users who wish to upgrade online, but
> >>>> cannot just add new machines to the pool. It could be a new upgrade
> >> mode
> >>>> "offline-in-place", though...
> >>>>
> >>>> 4. I was surprised to see that a user would need to modify the
> topology
> >>> to
> >>>> do an upgrade (using StoreUpgradeBuilder). Maybe some of Guozhang's
> >>>> suggestions would remove this necessity.
> >>>>
> >>>> Thanks for taking on this very complex but necessary work.
> >>>>
> >>>> -John
> >>>>
> >>>> On Thu, Aug 9, 2018 at 12:22 PM Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hello Matthias,
> >>>>>
> >>>>> Thanks for the updated KIP. Some more comments:
> >>>>>
> >>>>> 1. The current set of proposed API is a bit too complicated, which
> >> makes
> >>>>> the upgrade flow from user's perspective also a bit complex. I'd like
> >> to
> >>>>> check different APIs and discuss about their needs separately:
> >>>>>
> >>>>>     1.a. StoreProxy: needed for in-place upgrade only, between the
> >> first
> >>>>> and second rolling bounce, where the old-versioned stores can handle
> >>>>> new-versioned store APIs. I think such upgrade paths (i.e. from one
> >>> store
> >>>>> type to another) would not be very common: users may want to upgrade
> >>> from a
> >>>>> certain store engine to another, but the interface would likely be
> >>> staying
> >>>>> the same. Hence personally I'd suggest we keep it internally and only
> >>>>> consider exposing it in the future if it does become a common
> pattern.
> >>>>>
> >>>>>     1.b. ConverterStore / RecordConverter: needed for both in-place
> >> and
> >>>>> roll-over upgrade, between the first and second rolling bounces, for
> >> the
> >>>>> new versioned store to be able to read old-versioned changelog
> topics.
> >>>>> Firstly I think we should not expose key in the public APIs but only
> >> the
> >>>>> values, since allowing key format changes would break log compaction,
> >>> and
> >>>>> hence would not be compatible anyways. As for value format changes,
> >>>>> personally I think we can also keep its upgrade logic internally as
> it
> >>> may
> >>>>> not worth generalizing to user customizable logic.
> >>>>>
> >>>>>     1.c. If you agrees with 2.a/b above, then we can also remove "
> >>>>> keyValueToKeyValueWithTimestampUpgradeStoreBuilder" from the public
> >>> APIs.
> >>>>>
> >>>>>     1.d. Personally I think "ReadOnlyKeyValueWithTimestampStore" is
> >> not
> >>>>> needed either given that we are exposing "ValueAndTimestamp" anyways.
> >>> I.e.
> >>>>> it is just a syntax sugar and for IQ, users can always just set a "
> >>>>> QueryableStoreType<ReadOnlyKeyValue<K, ValueAndTimestamp<V>>>" as the
> >>> new
> >>>>> interface does not provide any additional functions.
> >>>>>
> >>>>>
> >>>>> 2. Could we further categorize the upgrade flow for different use
> >> cases,
> >>>>> e.g. 1) DSL users where KeyValueWithTimestampStore will be used
> >>>>> automatically for non-windowed aggregate; 2) PAPI users who do not
> >> need
> >>> to
> >>>>> use KeyValueWithTimestampStore; 3) PAPI users who do want to switch
> to
> >>>>> KeyValueWithTimestampStore. Just to give my understanding for 3), the
> >>>>> upgrade flow for users may be simplified as the following (for both
> >>>>> in-place and roll-over):
> >>>>>
> >>>>>     * Update the jar to new version, make code changes from
> >>> KeyValueStore
> >>>>> to KeyValueWithTimestampStore, set upgrade config.
> >>>>>
> >>>>>     * First rolling bounce, and library code can internally use proxy
> >> /
> >>>>> converter based on the specified config to handle new APIs with old
> >>> stores,
> >>>>> while let new stores read from old changelog data.
> >>>>>
> >>>>>     * Reset upgrade config.
> >>>>>
> >>>>>     * Second rolling bounce, and the library code automatically turn
> >> off
> >>>>> logic for proxy / converter.
> >>>>>
> >>>>>
> >>>>> 3. Some more detailed proposals are needed for when to recommend
> users
> >>> to
> >>>>> trigger the second rolling bounce. I have one idea to share here: we
> >>> add a
> >>>>> new state to KafkaStreams, say UPGRADING, which is set when 1)
> upgrade
> >>>>> config is set, and 2) the new stores are still ramping up (for the
> >>> second
> >>>>> part, we can start with some internal hard-coded heuristics to decide
> >>> when
> >>>>> it is close to be ramped up). If either one of it is not true any
> >> more,
> >>> it
> >>>>> should transit to RUNNING. Users can then watch on this state, and
> >>> decide
> >>>>> to only trigger the second rebalance when the state has transited
> from
> >>>>> UPGRADING. They can also choose to cut over while the instance is
> >> still
> >>>>> UPGRADING, the downside is that after that the application may have
> >> long
> >>>>> restoration phase which is, to user's pov, unavailability periods.
> >>>>>
> >>>>>
> >>>>> Below are just some minor things on the wiki:
> >>>>>
> >>>>> 4. "proxy story" => "proxy store".
> >>>>>
> >>>>> 5. "use the a builder " => "use a builder"
> >>>>>
> >>>>> 6: "we add the record timestamp as a 8-byte (long) prefix to the
> >> value":
> >>>>> what's the rationale of putting the timestamp before the value, than
> >>> after
> >>>>> the value?
> >>>>>
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Tue, Aug 7, 2018 at 5:13 PM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the feedback Bill. I just update the KIP with some of
> your
> >>>>>> points.
> >>>>>>
> >>>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch
> >> the
> >>>>>>>> restore process), I'm wondering if we want to provide a type of
> >>>>>>>> StateRestoreListener that could signal when the new stores have
> >>>>> reached
> >>>>>>>> parity with the existing old stores and that could be the signal
> to
> >>>>>> start
> >>>>>>>> second rolling rebalance?
> >>>>>>
> >>>>>> I think we can reuse the existing listeners, thus, I did not include
> >>>>>> anything in the KIP. About a signal to rebalance: this might be
> >> tricky.
> >>>>>> If we prepare the store "online", the active task will update the
> >> state
> >>>>>> continuously, and thus, state prepare is never finished. It will be
> >> the
> >>>>>> users responsibility to do the second rebalance (note, that the
> >> second
> >>>>>> rebalance will first finish the last delta of the upgrade to finish
> >> the
> >>>>>> upgrade before actual processing resumes). I clarified the KIP with
> >>> this
> >>>>>> regard a little bit.
> >>>>>>
> >>>>>>>> 1. Out of N instances, one fails midway through the process, would
> >> we
> >>>>>> allow
> >>>>>>>> the other instances to complete or just fail the entire upgrade?
> >>>>>>
> >>>>>> The idea is that the upgrade should be robust and not fail. We need
> >> to
> >>>>>> write according tests.
> >>>>>>
> >>>>>>>> 2. During the second rolling bounce, maybe we could rename the
> >>> current
> >>>>>>>> active directories vs. deleting them right away,  and when all the
> >>>>>> prepare
> >>>>>>>> task directories are successfully migrated then delete the
> previous
> >>>>>> active
> >>>>>>>> ones.
> >>>>>>
> >>>>>> Ack. Updated the KIP.
> >>>>>>
> >>>>>>>> 3. For the first rolling bounce we pause any processing any new
> >>>>> records
> >>>>>> and
> >>>>>>>> just allow the prepare tasks to restore, then once all prepare
> >> tasks
> >>>>>> have
> >>>>>>>> restored, it's a signal for the second round of rolling bounces
> and
> >>>>>> then as
> >>>>>>>> each task successfully renames its prepare directories and deletes
> >>> the
> >>>>>> old
> >>>>>>>> active task directories, normal processing of records resumes.
> >>>>>>
> >>>>>> The basic idea is to do an online upgrade to avoid downtime. We can
> >>>>>> discuss to offer both options... For the offline upgrade option, we
> >>>>>> could simplify user interaction and trigger the second rebalance
> >>>>>> automatically with the requirement that a user needs to update any
> >>>>> config.
> >>>>>>
> >>>>>> If might actually be worth to include this option: we know from
> >>>>>> experience with state restore, that regular processing slows down
> the
> >>>>>> restore. For roll_over upgrade, it would be a different story and
> >>>>>> upgrade should not be slowed down by regular processing. Thus, we
> >>> should
> >>>>>> even make in_place an offline upgrade and force people to use
> >> roll_over
> >>>>>> if they need onlint upgrade. Might be a fair tradeoff that may
> >> simplify
> >>>>>> the upgrade for the user and for the code complexity.
> >>>>>>
> >>>>>> Let's see what other think.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 7/27/18 12:53 PM, Bill Bejeck wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Thanks for the update and the working prototype, it helps with
> >>>>>>> understanding the KIP.
> >>>>>>>
> >>>>>>> I took an initial pass over this PR, and overall I find the
> >> interfaces
> >>>>>> and
> >>>>>>> approach to be reasonable.
> >>>>>>>
> >>>>>>> Regarding step 3C of the in-place upgrade (users needing to watch
> >> the
> >>>>>>> restore process), I'm wondering if we want to provide a type of
> >>>>>>> StateRestoreListener that could signal when the new stores have
> >>> reached
> >>>>>>> parity with the existing old stores and that could be the signal to
> >>>>> start
> >>>>>>> second rolling rebalance?
> >>>>>>>
> >>>>>>> Although you solicited feedback on the interfaces involved, I
> wanted
> >>> to
> >>>>>> put
> >>>>>>> down some thoughts that have come to mind reviewing this KIP again
> >>>>>>>
> >>>>>>> 1. Out of N instances, one fails midway through the process, would
> >> we
> >>>>>> allow
> >>>>>>> the other instances to complete or just fail the entire upgrade?
> >>>>>>> 2. During the second rolling bounce, maybe we could rename the
> >> current
> >>>>>>> active directories vs. deleting them right away,  and when all the
> >>>>>> prepare
> >>>>>>> task directories are successfully migrated then delete the previous
> >>>>>> active
> >>>>>>> ones.
> >>>>>>> 3. For the first rolling bounce we pause any processing any new
> >>> records
> >>>>>> and
> >>>>>>> just allow the prepare tasks to restore, then once all prepare
> tasks
> >>>>> have
> >>>>>>> restored, it's a signal for the second round of rolling bounces and
> >>>>> then
> >>>>>> as
> >>>>>>> each task successfully renames its prepare directories and deletes
> >> the
> >>>>>> old
> >>>>>>> active task directories, normal processing of records resumes.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Bill
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Wed, Jul 25, 2018 at 9:42 PM Matthias J. Sax <
> >>> matth...@confluent.io
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> KIP-268 (rebalance meatadata) is finished and included in AK 2.0
> >>>>>>>> release. Thus, I want to pick up this KIP again to get the RocksDB
> >>>>>>>> upgrade done for 2.1.
> >>>>>>>>
> >>>>>>>> I updated the KIP accordingly and also have a "prove of concept"
> PR
> >>>>>>>> ready (for "in place" upgrade only):
> >>>>>>>> https://github.com/apache/kafka/pull/5422/
> >>>>>>>>
> >>>>>>>> There a still open questions, but I want to collect early feedback
> >> on
> >>>>>>>> the proposed interfaces we need for the store upgrade. Also note,
> >>> that
> >>>>>>>> the KIP now also aim to define a generic upgrade path from any
> >> store
> >>>>>>>> format A to any other store format B. Adding timestamps is just a
> >>>>>>>> special case.
> >>>>>>>>
> >>>>>>>> I will continue to work on the PR and refine the KIP in the
> >> meantime,
> >>>>>> too.
> >>>>>>>>
> >>>>>>>> Looking forward to your feedback.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> >>>>>>>>> After some more thoughts, I want to follow John's suggestion and
> >>>>> split
> >>>>>>>>> upgrading the rebalance metadata from the store upgrade.
> >>>>>>>>>
> >>>>>>>>> I extracted the metadata upgrade into it's own KIP:
> >>>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%
> >>>>>> 3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>>>>>>>>
> >>>>>>>>> I'll update this KIP accordingly shortly. I also want to consider
> >> to
> >>>>>>>>> make the store format upgrade more flexible/generic. Atm, the KIP
> >> is
> >>>>>> too
> >>>>>>>>> much tailored to the DSL IMHO and does not encounter PAPI users
> >> that
> >>>>> we
> >>>>>>>>> should not force to upgrade the stores. I need to figure out the
> >>>>>> details
> >>>>>>>>> and follow up later.
> >>>>>>>>>
> >>>>>>>>> Please give feedback for the new KIP-268 on the corresponding
> >>>>>> discussion
> >>>>>>>>> thread.
> >>>>>>>>>
> >>>>>>>>> @James: unfortunately, for upgrading to 1.2 I couldn't figure out
> >> a
> >>>>> way
> >>>>>>>>> for a single rolling bounce upgrade. But KIP-268 proposes a fix
> >> for
> >>>>>>>>> future upgrades. Please share your thoughts.
> >>>>>>>>>
> >>>>>>>>> Thanks for all your feedback!
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
> >>>>>>>>>> @John: yes, we would throw if configs are missing (it's an
> >>>>>>>>>> implementation details IMHO and thus I did not include it in the
> >>>>> KIP)
> >>>>>>>>>>
> >>>>>>>>>> @Guozhang:
> >>>>>>>>>>
> >>>>>>>>>> 1) I understand know what you mean. We can certainly, allow all
> >>>>> values
> >>>>>>>>>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for
> >> `upgrade.from`
> >>>>>>>>>> parameter. I had a similar though once but decided to collapse
> >> them
> >>>>>> into
> >>>>>>>>>> one -- will update the KIP accordingly.
> >>>>>>>>>>
> >>>>>>>>>> 2) The idea to avoid any config would be, to always send both
> >>>>> request.
> >>>>>>>>>> If we add a config to eventually disable the old request, we
> >> don't
> >>>>>> gain
> >>>>>>>>>> anything with this approach. The question is really, if we are
> >>>>> willing
> >>>>>>>>>> to pay this overhead from 1.2 on -- note, it would be limited to
> >> 2
> >>>>>>>>>> versions and not grow further in future releases. More details
> in
> >>>>> (3)
> >>>>>>>>>>
> >>>>>>>>>> 3) Yes, this approach subsumes (2) for later releases and allows
> >> us
> >>>>> to
> >>>>>>>>>> stay with 2 "assignment strategies" we need to register, as the
> >> new
> >>>>>>>>>> assignment strategy will allow to "upgrade itself" via "version
> >>>>>>>>>> probing". Thus, (2) would only be a workaround to avoid a config
> >> if
> >>>>>>>>>> people upgrade from pre-1.2 releases.
> >>>>>>>>>>
> >>>>>>>>>> Thus, I don't think we need to register new "assignment
> >> strategies"
> >>>>>> and
> >>>>>>>>>> send empty subscriptions for older version.
> >>>>>>>>>>
> >>>>>>>>>> 4) I agree that this is a tricky thing to get right with a
> single
> >>>>>>>>>> rebalance. I share the concern that an application might never
> >>> catch
> >>>>>> up
> >>>>>>>>>> and thus the hot standby will never be ready.
> >>>>>>>>>>
> >>>>>>>>>> Maybe it's better to go with 2 rebalances for store upgrades. If
> >> we
> >>>>> do
> >>>>>>>>>> this, we also don't need to go with (2) and can get (3) in place
> >>> for
> >>>>>>>>>> future upgrades. I also think that changes to the metadata are
> >> more
> >>>>>>>>>> likely and thus allowing for single rolling bounce for this case
> >> is
> >>>>>> more
> >>>>>>>>>> important anyway. If we assume that store upgrade a rare, it
> >> might
> >>>>> be
> >>>>>> ok
> >>>>>>>>>> to sacrifice two rolling bounced for this case. It was just an
> >> idea
> >>>>> I
> >>>>>>>>>> wanted to share (even if I see the issues).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
> >>>>>>>>>>> Hello Matthias, thanks for your replies.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 1) About the config names: actually I was trying to not expose
> >>>>>>>>>>> implementation details :) My main concern was that in your
> >>> proposal
> >>>>>> the
> >>>>>>>>>>> values need to cover the span of all the versions that are
> >>> actually
> >>>>>>>> using
> >>>>>>>>>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am
> >>>>>>>> upgrading
> >>>>>>>>>>> from any versions within this range I need to remember to use
> >> the
> >>>>>> value
> >>>>>>>>>>> "0.10.1.x-1.1.x" than just specifying my old version. In my
> >>>>>> suggestion
> >>>>>>>> I
> >>>>>>>>>>> was trying to argue the benefit of just letting users to
> specify
> >>>>> the
> >>>>>>>> actual
> >>>>>>>>>>> Kafka version she's trying to upgrade from, than specifying a
> >>> range
> >>>>>> of
> >>>>>>>>>>> versions. I was not suggesting to use "v1, v2, v3" etc as the
> >>>>> values,
> >>>>>>>> but
> >>>>>>>>>>> still using Kafka versions like broker's `internal.version`
> >>> config.
> >>>>>>>> But if
> >>>>>>>>>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x"
> you
> >>>>>> meant
> >>>>>>>> to
> >>>>>>>>>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or
> >>>>> "1.1"
> >>>>>>>> which
> >>>>>>>>>>> are all recognizable config values then I think we are actually
> >> on
> >>>>>> the
> >>>>>>>> same
> >>>>>>>>>>> page.
> >>>>>>>>>>>
> >>>>>>>>>>> 2) About the "multi-assignment" idea: yes it would increase the
> >>>>>> network
> >>>>>>>>>>> footprint, but not the message size, IF I'm not
> >> mis-understanding
> >>>>>> your
> >>>>>>>> idea
> >>>>>>>>>>> of registering multiple assignment. More details:
> >>>>>>>>>>>
> >>>>>>>>>>> In the JoinGroupRequest, in the protocols field we can encode
> >>>>>> multiple
> >>>>>>>>>>> protocols each with their different metadata. The coordinator
> >> will
> >>>>>>>> pick the
> >>>>>>>>>>> common one that everyone supports (if there are no common one,
> >> it
> >>>>>> will
> >>>>>>>> send
> >>>>>>>>>>> an error back; if there are multiple ones, it will pick the one
> >>>>> with
> >>>>>>>> most
> >>>>>>>>>>> votes, i.e. the one which was earlier in the encoded list).
> >> Since
> >>>>> our
> >>>>>>>>>>> current Streams rebalance protocol is still based on the
> >> consumer
> >>>>>>>>>>> coordinator, it means our protocol_type would be "consumer",
> but
> >>>>>>>> instead
> >>>>>>>>>>> the protocol type we can have multiple protocols like
> "streams",
> >>>>>>>>>>> "streams_v2", "streams_v3" etc. The downside is that we need to
> >>>>>>>> implement a
> >>>>>>>>>>> different assignor class for each version and register all of
> >> them
> >>>>> in
> >>>>>>>>>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future
> >> if
> >>>>> we
> >>>>>>>>>>> re-factor our implementation to have our own client coordinator
> >>>>> layer
> >>>>>>>> like
> >>>>>>>>>>> Connect did, we can simplify this part of the implementation.
> >> But
> >>>>>> even
> >>>>>>>> for
> >>>>>>>>>>> now with the above approach this is still doable.
> >>>>>>>>>>>
> >>>>>>>>>>> On the broker side, the group coordinator will only persist a
> >>> group
> >>>>>>>> with
> >>>>>>>>>>> the selected protocol and its subscription metadata, e.g. if
> >>>>>>>> coordinator
> >>>>>>>>>>> decides to pick "streams_v2" it will only sends that protocol's
> >>>>>>>> metadata
> >>>>>>>>>>> from everyone to the leader to assign, AND when completing the
> >>>>>>>> rebalance it
> >>>>>>>>>>> will also only write the group metadata with that protocol and
> >> the
> >>>>>>>>>>> assignment only. In a word, although the network traffic maybe
> >>>>>>>> increased a
> >>>>>>>>>>> bit, it would not be a bummer in our trade-off. One corner
> >>>>> situation
> >>>>>> we
> >>>>>>>>>>> need to consider is how to stop registering very old assignors
> >> to
> >>>>>>>> avoid the
> >>>>>>>>>>> network traffic from increasing indefinitely, e.g. if you are
> >>>>> rolling
> >>>>>>>>>>> bounce from v2 to v3, then you'd not need to register v1
> >> assignor
> >>>>>>>> anymore,
> >>>>>>>>>>> but that would unfortunately still require some configs.
> >>>>>>>>>>>
> >>>>>>>>>>> 3) About the  "version probing" idea, I think that's a
> promising
> >>>>>>>> approach
> >>>>>>>>>>> as well, but if we are going to do the multi-assignment its
> >> value
> >>>>>> seems
> >>>>>>>>>>> subsumed? But I'm thinking maybe it can be added on top of
> >>>>>>>> multi-assignment
> >>>>>>>>>>> to save us from still requiring the config to avoid registering
> >>> all
> >>>>>> the
> >>>>>>>>>>> metadata for all version. More details:
> >>>>>>>>>>>
> >>>>>>>>>>> In the JoinGroupRequest, we still register all the assignor but
> >>> for
> >>>>>>>> all old
> >>>>>>>>>>> assignors we do not encode any metadata, i.e. the encoded data
> >>>>> would
> >>>>>>>> be:
> >>>>>>>>>>>
> >>>>>>>>>>> "streams_vN" : "encoded metadata"
> >>>>>>>>>>> "streams_vN-1":empty
> >>>>>>>>>>> "streams_vN-2":empty
> >>>>>>>>>>> ..
> >>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>
> >>>>>>>>>>> So the coordinator can still safely choose the latest common
> >>>>> version;
> >>>>>>>> and
> >>>>>>>>>>> then when leaders receive the subscription (note it should
> >> always
> >>>>>>>> recognize
> >>>>>>>>>>> that version), let's say it is streams_vN-2, if one of the
> >>>>>>>> subscriptions
> >>>>>>>>>>> are empty bytes, it will send the empty assignment with that
> >>>>> version
> >>>>>>>> number
> >>>>>>>>>>> encoded in the metadata. So in the second auto-triggered all
> >>>>> members
> >>>>>>>> would
> >>>>>>>>>>> send the metadata with that version:
> >>>>>>>>>>>
> >>>>>>>>>>> "streams_vN" : empty
> >>>>>>>>>>> "streams_vN-1" : empty
> >>>>>>>>>>> "streams_vN-2" : "encoded metadata"
> >>>>>>>>>>> ..
> >>>>>>>>>>> "streams_0":empty
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> By doing this we would not require any configs for users.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about
> >>> the
> >>>>>>>> details
> >>>>>>>>>>> so probably we'd need to fill that out before making a call.
> For
> >>>>>>>> example,
> >>>>>>>>>>> you mentioned "If we detect this situation, the Streams
> >>> application
> >>>>>>>> closes
> >>>>>>>>>>> corresponding active tasks as well as "hot standby" tasks, and
> >>>>>>>> re-creates
> >>>>>>>>>>> the new active tasks using the new store." How could we
> >> guarantee
> >>>>>> that
> >>>>>>>> the
> >>>>>>>>>>> gap between these two stores will keep decreasing than
> >> increasing
> >>>>> so
> >>>>>>>> we'll
> >>>>>>>>>>> eventually achieve the flip point? And also the longer we are
> >>>>> before
> >>>>>>>> the
> >>>>>>>>>>> flip point, the larger we are doubling the storage space, etc.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <
> >>>>>>>> matth...@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> @John, Guozhang,
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks a lot for your comments. Very long reply...
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> About upgrading the rebalance metadata:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Another possibility to do this, would be to register multiple
> >>>>>>>> assignment
> >>>>>>>>>>>> strategies for the 1.2 applications. For this case, new
> >> instances
> >>>>>>>> would
> >>>>>>>>>>>> be configured to support both and the broker would pick the
> >>>>> version
> >>>>>>>> that
> >>>>>>>>>>>> all instances understand. The disadvantage would be, that we
> >> send
> >>>>>> much
> >>>>>>>>>>>> more data (ie, two subscriptions) in each rebalance as long as
> >> no
> >>>>>>>> second
> >>>>>>>>>>>> rebalance is done disabling the old protocol. Thus, using this
> >>>>>>>> approach
> >>>>>>>>>>>> would allow to avoid a second rebalance trading-off an
> >> increased
> >>>>>>>>>>>> rebalance network footprint (I also assume that this would
> >>>>> increase
> >>>>>>>> the
> >>>>>>>>>>>> message size that is written into __consumer_offsets topic?).
> >>>>>>>> Overall, I
> >>>>>>>>>>>> am not sure if this would be a good tradeoff, but it could
> >> avoid
> >>> a
> >>>>>>>>>>>> second rebalance (I have some more thoughts about stores below
> >>>>> that
> >>>>>>>> are
> >>>>>>>>>>>> relevant for single rebalance upgrade).
> >>>>>>>>>>>>
> >>>>>>>>>>>> For future upgrades we might be able to fix this though. I was
> >>>>>>>> thinking
> >>>>>>>>>>>> about the following:
> >>>>>>>>>>>>
> >>>>>>>>>>>> In the current implementation, the leader fails if it gets a
> >>>>>>>>>>>> subscription it does not understand (ie, newer version). We
> >> could
> >>>>>>>> change
> >>>>>>>>>>>> this behavior and let the leader send an empty assignment plus
> >>>>> error
> >>>>>>>>>>>> code (including supported version) back to the instance
> sending
> >>>>> the
> >>>>>>>>>>>> "bad" subscription. This would allow the following logic for
> an
> >>>>>>>>>>>> application instance:
> >>>>>>>>>>>>
> >>>>>>>>>>>>  - on startup, always send the latest subscription format
> >>>>>>>>>>>>  - if leader understands it, we get an assignment back an
> start
> >>>>>>>> processing
> >>>>>>>>>>>>  - if leader does not understand it, we get an empty
> assignment
> >>>>> and
> >>>>>>>>>>>> supported version back
> >>>>>>>>>>>>  - the application unsubscribe()/subscribe()/poll() again and
> >>>>>> sends a
> >>>>>>>>>>>> subscription using the leader's supported version
> >>>>>>>>>>>>
> >>>>>>>>>>>> This protocol would allow to do a single rolling bounce, and
> >>>>>>>> implements
> >>>>>>>>>>>> a "version probing" step, that might result in two executed
> >>>>>>>> rebalances.
> >>>>>>>>>>>> The advantage would be, that the user does not need to set any
> >>>>>> configs
> >>>>>>>>>>>> or do multiple rolling bounces, as Streams takes care of this
> >>>>>>>>>>>> automatically.
> >>>>>>>>>>>>
> >>>>>>>>>>>> One disadvantage would be, that two rebalances happen and that
> >>> for
> >>>>>> an
> >>>>>>>>>>>> error case during rebalance, we loose the information about
> the
> >>>>>>>>>>>> supported leader version and the "probing step" would happen a
> >>>>>> second
> >>>>>>>> time.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If the leader is eventually updated, it will include it's own
> >>>>>>>> supported
> >>>>>>>>>>>> version in all assignments, to allow a "down graded"
> >> application
> >>>>> to
> >>>>>>>>>>>> upgrade its version later. Also, if a application fails, the
> >>> first
> >>>>>>>>>>>> probing would always be successful and only a single rebalance
> >>>>>>>> happens.
> >>>>>>>>>>>> If we use this protocol, I think we don't need any
> >> configuration
> >>>>>>>>>>>> parameter for future upgrades.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> About "upgrade.from" vs "internal.protocol.version":
> >>>>>>>>>>>>
> >>>>>>>>>>>> Users would set "upgrade.from" to the release version the
> >>>>>> current/old
> >>>>>>>>>>>> application is using. I think this is simpler, as users know
> >> this
> >>>>>>>>>>>> version. If we use "internal.protocol.version" instead, we
> >> expose
> >>>>>>>>>>>> implementation details and users need to know the protocol
> >>> version
> >>>>>>>> (ie,
> >>>>>>>>>>>> they need to map from the release version to the protocol
> >>> version;
> >>>>>> ie,
> >>>>>>>>>>>> "I am run 0.11.0 that runs with metadata protocol version 2").
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also the KIP states that for the second rolling bounce, the
> >>>>>>>>>>>> "upgrade.mode" config should be set back to `null` -- and
> thus,
> >>>>>>>>>>>> "upgrade.from" would not have any effect and is ignored (I
> will
> >>>>>> update
> >>>>>>>>>>>> the KIP to point out this dependency).
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> About your second point: I'll update the KIP accordingly to
> >>>>> describe
> >>>>>>>>>>>> future updates as well. Both will be different.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> One more point about upgrading the store format. I was
> thinking
> >>>>>> about
> >>>>>>>>>>>> avoiding the second rolling bounce all together in the future:
> >>> (1)
> >>>>>> the
> >>>>>>>>>>>> goal is to achieve an upgrade with zero downtime (2) this
> >>> required
> >>>>>> to
> >>>>>>>>>>>> prepare the stores as "hot standbys" before we do the switch
> >> and
> >>>>>>>> delete
> >>>>>>>>>>>> the old stores. (3) the current proposal does the switch
> >>>>> "globally"
> >>>>>> --
> >>>>>>>>>>>> this is simpler and due to the required second rebalance no
> >>>>>>>> disadvantage.
> >>>>>>>>>>>> However, a global consistent switch over might actually not be
> >>>>>>>> required.
> >>>>>>>>>>>> For "in_place" upgrade, following the protocol from above, we
> >>>>> could
> >>>>>>>>>>>> decouple the store switch and each instance could switch its
> >>> store
> >>>>>>>>>>>> independently from all other instances. After the rolling
> >> bounce,
> >>>>> it
> >>>>>>>>>>>> seems to be ok to switch from the old store to the new store
> >>>>> "under
> >>>>>>>> the
> >>>>>>>>>>>> hood" whenever the new store is ready (this could even be
> done,
> >>>>>> before
> >>>>>>>>>>>> we switch to the new metadata version). Each time we update
> the
> >>>>> "hot
> >>>>>>>>>>>> standby" we check if it reached the "endOffset"  (or maybe X%
> >>> that
> >>>>>>>> could
> >>>>>>>>>>>> either be hardcoded or configurable). If we detect this
> >>> situation,
> >>>>>> the
> >>>>>>>>>>>> Streams application closes corresponding active tasks as well
> >> as
> >>>>>> "hot
> >>>>>>>>>>>> standby" tasks, and re-creates the new active tasks using the
> >> new
> >>>>>>>> store.
> >>>>>>>>>>>> (I need to go through the details once again, but it seems to
> >> be
> >>>>>>>>>>>> feasible.).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Combining this strategy with the "multiple assignment" idea,
> >>> might
> >>>>>>>> even
> >>>>>>>>>>>> enable us to do an single rolling bounce upgrade from 1.1 ->
> >> 1.2.
> >>>>>>>>>>>> Applications would just use the old store, as long as the new
> >>>>> store
> >>>>>> is
> >>>>>>>>>>>> not ready, even if the new metadata version is used already.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For future upgrades, a single rebalance would be sufficient,
> >> too,
> >>>>>> even
> >>>>>>>>>>>> if the stores are upgraded. We would not need any config
> >>>>> parameters
> >>>>>> as
> >>>>>>>>>>>> the "probe" step allows us to detect the supported rebalance
> >>>>>> metadata
> >>>>>>>>>>>> version (and we would also not need multiple "assigmnent
> >>>>> strategies"
> >>>>>>>> as
> >>>>>>>>>>>> out own protocol encoded everything we need).
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For the protocol version upgrade, it is only for the encoded
> >>>>>> metadata
> >>>>>>>>>>>> bytes
> >>>>>>>>>>>>> protocol, which are just bytes-in bytes-out from Consumer's
> >> pov,
> >>>>>> so I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>> this change should be in the Streams layer as well.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> @Matthias:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> for 2), I agree that adding a "newest supported version"
> >> besides
> >>>>>> the
> >>>>>>>>>>>>> "currently used version for encoding" is a good idea to allow
> >>>>>> either
> >>>>>>>>>>>> case;
> >>>>>>>>>>>>> the key is that in Streams we would likely end up with a
> >> mapping
> >>>>>>>> from the
> >>>>>>>>>>>>> protocol version to the other persistent data format versions
> >>>>> such
> >>>>>> as
> >>>>>>>>>>>>> rocksDB, changelog. So with such a map we can actually
> achieve
> >>>>> both
> >>>>>>>>>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded
> protocol
> >>>>>>>> version's
> >>>>>>>>>>>>> corresponding data format does not change, e.g. 0.10.0 ->
> >> 0.10.1
> >>>>>>>> leaders
> >>>>>>>>>>>>> can choose to use the newer version in the first rolling
> >> bounce
> >>>>>>>> directly
> >>>>>>>>>>>>> and we can document to users that they would not need to set
> >>>>>>>>>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded
> >>>>> protocol
> >>>>>>>>>>>> version
> >>>>>>>>>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and
> >> then
> >>>>> we
> >>>>>>>> can
> >>>>>>>>>>>>> document that "upgrade.mode" needs to be set in the first
> >>> rolling
> >>>>>>>> bounce
> >>>>>>>>>>>>> and reset in the second.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Besides that, some additional comments:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) I still think "upgrade.from" is less intuitive for users
> to
> >>>>> set
> >>>>>>>> than
> >>>>>>>>>>>>> "internal.protocol.version" where for the latter users only
> >> need
> >>>>> to
> >>>>>>>> set a
> >>>>>>>>>>>>> single version, while the Streams will map that version to
> the
> >>>>>>>> Streams
> >>>>>>>>>>>>> assignor's behavior as well as the data format. But maybe I
> >> did
> >>>>> not
> >>>>>>>> get
> >>>>>>>>>>>>> your idea about how the  "upgrade.from" config will be set,
> >>>>> because
> >>>>>>>> in
> >>>>>>>>>>>>> your Compatibility section how the upgrade.from config will
> be
> >>>>> set
> >>>>>>>> for
> >>>>>>>>>>>>> these two rolling bounces are not very clear: for example,
> >>> should
> >>>>>>>> user
> >>>>>>>>>>>>> reset it to null in the second rolling bounce?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) In the upgrade path description, rather than talking about
> >>>>>>>> specific
> >>>>>>>>>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize
> >> all
> >>>>>> the
> >>>>>>>>>>>>> possible scenarios, even for future upgrade versions, what
> >>> should
> >>>>>> be
> >>>>>>>> the
> >>>>>>>>>>>>> standard operations? The categorized we can summarize to
> would
> >>> be
> >>>>>>>>>>>> (assuming
> >>>>>>>>>>>>> user upgrade from version X to version Y, where X and Y are
> >>> Kafka
> >>>>>>>>>>>> versions,
> >>>>>>>>>>>>> with the corresponding supported protocol version x and y):
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> a. x == y, i.e. metadata protocol does not change, and hence
> >> no
> >>>>>>>>>>>> persistent
> >>>>>>>>>>>>> data formats have changed.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> b. x != y, but all persistent data format remains the same.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> b. x !=y, AND some persistene data format like RocksDB
> format,
> >>>>>>>> changelog
> >>>>>>>>>>>>> format, has been changed.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> c. special case: we may need some special handling logic when
> >>>>>>>> "current
> >>>>>>>>>>>>> version" or "newest supported version" are not available in
> >> the
> >>>>>>>> protocol,
> >>>>>>>>>>>>> i.e. for X as old as 0.10.0 and before 1.2.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> under the above scenarios, how many rolling bounces users
> need
> >>> to
> >>>>>>>>>>>> execute?
> >>>>>>>>>>>>> how they should set the configs in each rolling bounce? and
> >> how
> >>>>>>>> Streams
> >>>>>>>>>>>>> library will execute in these cases?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <
> >>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Ted,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I still consider changing the KIP to include it right away
> --
> >>> if
> >>>>>>>> not,
> >>>>>>>>>>>>>> I'll create a JIRA. Need to think it through in more detail
> >>>>> first.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (Same for other open questions like interface names -- I
> >>> collect
> >>>>>>>>>>>>>> feedback and update the KIP after we reach consensus :))
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
> >>>>>>>>>>>>>>> Thanks for the details, Matthias.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> bq. change the metadata protocol only if a future release,
> >>>>>> encoding
> >>>>>>>>>>>> both
> >>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>> and supported version might be an advantage
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Looks like encoding both versions wouldn't be implemented
> in
> >>>>> this
> >>>>>>>> KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please consider logging a JIRA with the encoding proposal.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <
> >>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @Bill: I think a filter predicate should be part of user
> >>> code.
> >>>>>> And
> >>>>>>>>>>>> even
> >>>>>>>>>>>>>>>> if we want to add something like this, I would prefer to
> do
> >>> it
> >>>>>> in
> >>>>>>>> a
> >>>>>>>>>>>>>>>> separate KIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> @James: I would love to avoid a second rolling bounce. But
> >>>>> from
> >>>>>> my
> >>>>>>>>>>>>>>>> understanding it would not be possible.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The purpose of the second rolling bounce is indeed to
> >> switch
> >>>>>> from
> >>>>>>>>>>>>>>>> version 2 to 3. It also has a second purpose, to switch
> >> from
> >>>>> the
> >>>>>>>> old
> >>>>>>>>>>>>>>>> store to the new store (this happens after the last
> >> instance
> >>>>>>>> bounces a
> >>>>>>>>>>>>>>>> second time).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The problem with one round of rolling bounces is, that
> it's
> >>>>>>>> unclear
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> to which from version 2 to version 3. The
> >>>>>>>> StreamsPartitionsAssignor is
> >>>>>>>>>>>>>>>> stateless by design, and thus, the information which
> >> version
> >>>>> it
> >>>>>>>> should
> >>>>>>>>>>>>>>>> use must be passed in from externally -- and we want to
> use
> >>>>> the
> >>>>>>>>>>>>>>>> StreamsConfig to pass in this information.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> During upgrade, all new instanced have no information
> about
> >>>>> the
> >>>>>>>>>>>> progress
> >>>>>>>>>>>>>>>> of the upgrade (ie, how many other instanced got upgrades
> >>>>>>>> already).
> >>>>>>>>>>>>>>>> Therefore, it's not safe for them to send a version 3
> >>>>>>>> subscription.
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>>>> leader also has this limited view on the world and can
> only
> >>>>> send
> >>>>>>>>>>>> version
> >>>>>>>>>>>>>>>> 2 assignments back.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify
> >> the
> >>>>>>>> upgrade.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We did consider to change the metadata to make later
> >> upgrades
> >>>>>> (ie,
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the
> >>>>> metadata
> >>>>>> or
> >>>>>>>>>>>>>>>> storage format again -- as long as we don't change it, a
> >>>>> single
> >>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>> bounce is sufficient), by encoding "used version" and
> >>>>> "supported
> >>>>>>>>>>>>>>>> version". This would allow the leader to switch to the new
> >>>>>> version
> >>>>>>>>>>>>>>>> earlier and without a second rebalance: leader would
> >> receive
> >>>>>> "used
> >>>>>>>>>>>>>>>> version == old" and "supported version = old/new" -- as
> >> long
> >>>>> as
> >>>>>> at
> >>>>>>>>>>>> least
> >>>>>>>>>>>>>>>> one instance sends a "supported version = old" leader
> sends
> >>>>> old
> >>>>>>>>>>>> version
> >>>>>>>>>>>>>>>> assignment back. However, encoding both version would
> allow
> >>>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>>>> leader can send a new version assignment back, right after
> >>> the
> >>>>>>>> first
> >>>>>>>>>>>>>>>> round or rebalance finished (all instances send "supported
> >>>>>>>> version =
> >>>>>>>>>>>>>>>> new"). However, there are still two issues with this:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) if we switch to the new format right after the last
> >>>>> instance
> >>>>>>>>>>>> bounced,
> >>>>>>>>>>>>>>>> the new stores might not be ready to be used -- this could
> >>>>> lead
> >>>>>> to
> >>>>>>>>>>>>>>>> "downtime" as store must be restored before processing can
> >>>>>> resume.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2) Assume an instance fails and is restarted again. At
> this
> >>>>>>>> point, the
> >>>>>>>>>>>>>>>> instance will still have "upgrade mode" enabled and thus
> >>> sends
> >>>>>>>> the old
> >>>>>>>>>>>>>>>> protocol data. However, it would be desirable to never
> fall
> >>>>> back
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> old protocol after the switch to the new protocol.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The second issue is minor and I guess if users set-up the
> >>>>>> instance
> >>>>>>>>>>>>>>>> properly it could be avoided. However, the first issue
> >> would
> >>>>>>>> prevent
> >>>>>>>>>>>>>>>> "zero downtime" upgrades. Having said this, if we consider
> >>>>> that
> >>>>>> we
> >>>>>>>>>>>> might
> >>>>>>>>>>>>>>>> change the metadata protocol only if a future release,
> >>>>> encoding
> >>>>>>>> both
> >>>>>>>>>>>>>>>> used and supported version might be an advantage in the
> >>> future
> >>>>>>>> and we
> >>>>>>>>>>>>>>>> could consider to add this information in 1.2 release to
> >>>>> prepare
> >>>>>>>> for
> >>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Btw: monitoring the log, is also only required to give the
> >>>>>>>> instances
> >>>>>>>>>>>>>>>> enough time to prepare the stores in new format. If you
> >> would
> >>>>> do
> >>>>>>>> the
> >>>>>>>>>>>>>>>> second rolling bounce before this, it would still work --
> >>>>>>>> however, you
> >>>>>>>>>>>>>>>> might see app "downtime" as the new store must be fully
> >>>>> restored
> >>>>>>>>>>>> before
> >>>>>>>>>>>>>>>> processing can resume.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Does this make sense?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
> >>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For all the upgrade paths, is it possible to get rid of
> >> the
> >>>>> 2nd
> >>>>>>>>>>>> rolling
> >>>>>>>>>>>>>>>> bounce?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For the in-place upgrade, it seems like primary
> difference
> >>>>>>>> between
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide
> >>>>>>>> whether to
> >>>>>>>>>>>>>> send
> >>>>>>>>>>>>>>>> Subscription Version 2 or Subscription Version 3.
> >> (Actually,
> >>>>>>>> there is
> >>>>>>>>>>>>>>>> another difference mentioned in that the KIP says that the
> >>> 2nd
> >>>>>>>> rolling
> >>>>>>>>>>>>>>>> bounce should happen after all new state stores are
> created
> >>> by
> >>>>>> the
> >>>>>>>>>>>>>>>> background thread. However, within the 2nd rolling bounce,
> >> we
> >>>>>> say
> >>>>>>>> that
> >>>>>>>>>>>>>>>> there is still a background thread, so it seems like is no
> >>>>>> actual
> >>>>>>>>>>>>>>>> requirement to wait for the new state stores to be
> >> created.)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The 2nd rolling bounce already knows how to deal with
> >>>>>> mixed-mode
> >>>>>>>>>>>>>> (having
> >>>>>>>>>>>>>>>> both Version 2 and Version 3 in the same consumer group).
> >> It
> >>>>>> seems
> >>>>>>>>>>>> like
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> could get rid of the 2nd bounce if we added logic
> >>>>>>>> (somehow/somewhere)
> >>>>>>>>>>>>>> such
> >>>>>>>>>>>>>>>> that:
> >>>>>>>>>>>>>>>>> * Instances send Subscription Version 2 until all
> >> instances
> >>>>> are
> >>>>>>>>>>>> running
> >>>>>>>>>>>>>>>> the new code.
> >>>>>>>>>>>>>>>>> * Once all the instances are running the new code, then
> >> one
> >>>>> at
> >>>>>> a
> >>>>>>>>>>>> time,
> >>>>>>>>>>>>>>>> the instances start sending Subscription V3. Leader still
> >>>>> hands
> >>>>>>>> out
> >>>>>>>>>>>>>>>> Assignment Version 2, until all new state stores are
> ready.
> >>>>>>>>>>>>>>>>> * Once all instances report that new stores are ready,
> >>> Leader
> >>>>>>>> sends
> >>>>>>>>>>>> out
> >>>>>>>>>>>>>>>> Assignment Version 3.
> >>>>>>>>>>>>>>>>> * Once an instance receives an Assignment Version 3, it
> >> can
> >>>>>>>> delete
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> old state store.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Doing it that way seems like it would reduce a lot of
> >>>>>>>>>>>>>>>> operator/deployment overhead. No need to do 2 rolling
> >>>>> restarts.
> >>>>>> No
> >>>>>>>>>>>> need
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> monitor logs for state store rebuild. You just deploy it,
> >> and
> >>>>>> the
> >>>>>>>>>>>>>> instances
> >>>>>>>>>>>>>>>> update themselves.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The thing that made me think of this is that the "2
> >> rolling
> >>>>>>>> bounces"
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> similar to what Kafka brokers have to do changes in
> >>>>>>>>>>>>>>>> inter.broker.protocol.version and
> >> log.message.format.version.
> >>>>>> And
> >>>>>>>> in
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> broker case, it seems like it would be possible (with some
> >>>>> work
> >>>>>> of
> >>>>>>>>>>>>>> course)
> >>>>>>>>>>>>>>>> to modify kafka to allow us to do similar auto-detection
> of
> >>>>>> broker
> >>>>>>>>>>>>>>>> capabilities and automatically do a switchover from
> old/new
> >>>>>>>> versions.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -James
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <
> >>> bbej...@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Matthias,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I do have one question regarding the retrieval methods
> on
> >>>>> the
> >>>>>>>> new
> >>>>>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Would want to consider adding one method with a
> Predicate
> >>>>> that
> >>>>>>>> would
> >>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>> for filtering records by the timestamp stored with the
> >>>>> record?
> >>>>>>>> Or
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> better left for users to implement themselves once the
> >> data
> >>>>>> has
> >>>>>>>> been
> >>>>>>>>>>>>>>>>>> retrieved?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <
> >>> yuzhih...@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>> For my point #1, I don't have preference as to which
> >>>>>> separator
> >>>>>>>> is
> >>>>>>>>>>>>>>>> chosen.
> >>>>>>>>>>>>>>>>>>> Given the background you mentioned, current choice is
> >>> good.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For #2, I think my proposal is better since it is
> closer
> >>> to
> >>>>>>>> English
> >>>>>>>>>>>>>>>>>>> grammar.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Would be good to listen to what other people think.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
> >>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for the comments!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> @Guozhang:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So far, there is one PR for the rebalance metadata
> >>> upgrade
> >>>>>> fix
> >>>>>>>>>>>>>>>>>>>> (addressing the mentioned
> >>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It
> >>>>> give a
> >>>>>>>> first
> >>>>>>>>>>>>>>>>>>>> impression how the metadata upgrade works including a
> >>>>> system
> >>>>>>>> test:
> >>>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I can share other PRs as soon as they are ready. I
> >> agree
> >>>>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> complex am I ok with putting out more code to give
> >> better
> >>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>>>>>>> context.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> @Ted:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I picked `_` instead of `-` to align with the
> >>>>>>>>>>>> `processing.guarantee`
> >>>>>>>>>>>>>>>>>>>> parameter that accepts `at_least_one` and
> >> `exactly_once`
> >>>>> as
> >>>>>>>>>>>> values.
> >>>>>>>>>>>>>>>>>>>> Personally, I don't care about underscore vs dash but
> I
> >>>>>> prefer
> >>>>>>>>>>>>>>>>>>>> consistency. If you feel strong about it, we can also
> >>>>> change
> >>>>>>>> it to
> >>>>>>>>>>>>>>>> `-`.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> About the interface name: I am fine either way -- I
> >>>>> stripped
> >>>>>>>> the
> >>>>>>>>>>>>>>>> `With`
> >>>>>>>>>>>>>>>>>>>> to keep the name a little shorter. Would be good to
> get
> >>>>>>>> feedback
> >>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>> others and pick the name the majority prefers.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> @John:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We can certainly change it. I agree that it would not
> >>>>> make a
> >>>>>>>>>>>>>>>> difference.
> >>>>>>>>>>>>>>>>>>>> I'll dig into the code to see if any of the two
> version
> >>>>>> might
> >>>>>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit
> >> an
> >>>>>>>> issue
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> putting the `-v2` to the store directory instead of
> >>>>>>>> `rocksdb-v2`
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
> >>>>>>>>>>>>>>>>>>>>> Hey Matthias,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The KIP looks good to me. I had several questions
> >> queued
> >>>>>> up,
> >>>>>>>> but
> >>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>> were
> >>>>>>>>>>>>>>>>>>>>> all in the "rejected alternatives" section... oh,
> >> well.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> One very minor thought re changing the state
> directory
> >>>>> from
> >>>>>>>>>>>>>>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to
> >>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if
> >>> you
> >>>>>>>> put the
> >>>>>>>>>>>>>>>> "v2"
> >>>>>>>>>>>>>>>>>>>>> marker on the storeName part of the path (i.e.,
> >>>>>>>> "/<state.dir>/<
> >>>>>>>>>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"),
> >> then
> >>>>>> you
> >>>>>>>> get
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>> benefits without altering the high-level directory
> >>>>>> structure.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> It may not matter, but I could imagine people running
> >>>>>>>> scripts to
> >>>>>>>>>>>>>>>>>>> monitor
> >>>>>>>>>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use
> >>>>> cases.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <
> >>>>>> yuzhih...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Matthias:
> >>>>>>>>>>>>>>>>>>>>>> Nicely written KIP.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may
> >>>>>>>> sometimes
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> miss
> >>>>>>>>>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly
> to
> >>>>>> user.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K,
> V>
> >>> {
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name
> for
> >>>>> the
> >>>>>>>>>>>> class ?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
> >>>>>>>>>>>> wangg...@gmail.com
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I've read through the upgrade patch section and it
> >>>>> looks
> >>>>>>>> good
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>> already have a WIP PR for it could you also share
> it
> >>>>> here
> >>>>>>>> so
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> people
> >>>>>>>>>>>>>>>>>>>>>>> can take a look?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this
> >>>>> there
> >>>>>>>> are
> >>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>> devil hidden in the details, so I think it is
> better
> >>> to
> >>>>>>>> have
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> implementation in parallel along with the design
> >>>>>>>> discussion :)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to
> >>> allow
> >>>>>>>> storing
> >>>>>>>>>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis
> to
> >>>>>>>> resolve
> >>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>>>> tickets (issues and feature requests).
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your comments about this!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to