Hi,

KIP-268 (rebalance meatadata) is finished and included in AK 2.0
release. Thus, I want to pick up this KIP again to get the RocksDB
upgrade done for 2.1.

I updated the KIP accordingly and also have a "prove of concept" PR
ready (for "in place" upgrade only):
https://github.com/apache/kafka/pull/5422/

There a still open questions, but I want to collect early feedback on
the proposed interfaces we need for the store upgrade. Also note, that
the KIP now also aim to define a generic upgrade path from any store
format A to any other store format B. Adding timestamps is just a
special case.

I will continue to work on the PR and refine the KIP in the meantime, too.

Looking forward to your feedback.

-Matthias


On 3/14/18 11:14 PM, Matthias J. Sax wrote:
> After some more thoughts, I want to follow John's suggestion and split
> upgrading the rebalance metadata from the store upgrade.
> 
> I extracted the metadata upgrade into it's own KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> 
> I'll update this KIP accordingly shortly. I also want to consider to
> make the store format upgrade more flexible/generic. Atm, the KIP is too
> much tailored to the DSL IMHO and does not encounter PAPI users that we
> should not force to upgrade the stores. I need to figure out the details
> and follow up later.
> 
> Please give feedback for the new KIP-268 on the corresponding discussion
> thread.
> 
> @James: unfortunately, for upgrading to 1.2 I couldn't figure out a way
> for a single rolling bounce upgrade. But KIP-268 proposes a fix for
> future upgrades. Please share your thoughts.
> 
> Thanks for all your feedback!
> 
> -Matthias
> 
> On 3/12/18 11:56 PM, Matthias J. Sax wrote:
>> @John: yes, we would throw if configs are missing (it's an
>> implementation details IMHO and thus I did not include it in the KIP)
>>
>> @Guozhang:
>>
>> 1) I understand know what you mean. We can certainly, allow all values
>> "0.10.0.x", "0.10.1.x", "0.10.2.x", ... "1.1.x" for `upgrade.from`
>> parameter. I had a similar though once but decided to collapse them into
>> one -- will update the KIP accordingly.
>>
>> 2) The idea to avoid any config would be, to always send both request.
>> If we add a config to eventually disable the old request, we don't gain
>> anything with this approach. The question is really, if we are willing
>> to pay this overhead from 1.2 on -- note, it would be limited to 2
>> versions and not grow further in future releases. More details in (3)
>>
>> 3) Yes, this approach subsumes (2) for later releases and allows us to
>> stay with 2 "assignment strategies" we need to register, as the new
>> assignment strategy will allow to "upgrade itself" via "version
>> probing". Thus, (2) would only be a workaround to avoid a config if
>> people upgrade from pre-1.2 releases.
>>
>> Thus, I don't think we need to register new "assignment strategies" and
>> send empty subscriptions for older version.
>>
>> 4) I agree that this is a tricky thing to get right with a single
>> rebalance. I share the concern that an application might never catch up
>> and thus the hot standby will never be ready.
>>
>> Maybe it's better to go with 2 rebalances for store upgrades. If we do
>> this, we also don't need to go with (2) and can get (3) in place for
>> future upgrades. I also think that changes to the metadata are more
>> likely and thus allowing for single rolling bounce for this case is more
>> important anyway. If we assume that store upgrade a rare, it might be ok
>> to sacrifice two rolling bounced for this case. It was just an idea I
>> wanted to share (even if I see the issues).
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/12/18 11:45 AM, Guozhang Wang wrote:
>>> Hello Matthias, thanks for your replies.
>>>
>>>
>>> 1) About the config names: actually I was trying to not expose
>>> implementation details :) My main concern was that in your proposal the
>>> values need to cover the span of all the versions that are actually using
>>> the same version, i.e. "0.10.1.x-1.1.x". So if I (as a user) am upgrading
>>> from any versions within this range I need to remember to use the value
>>> "0.10.1.x-1.1.x" than just specifying my old version. In my suggestion I
>>> was trying to argue the benefit of just letting users to specify the actual
>>> Kafka version she's trying to upgrade from, than specifying a range of
>>> versions. I was not suggesting to use "v1, v2, v3" etc as the values, but
>>> still using Kafka versions like broker's `internal.version` config. But if
>>> you were suggesting the same thing, i.e. by "0.10.1.x-1.1.x" you meant to
>>> say users can just specify "0.10.1" or "0.10.2" or "0.11.0" or "1.1" which
>>> are all recognizable config values then I think we are actually on the same
>>> page.
>>>
>>> 2) About the "multi-assignment" idea: yes it would increase the network
>>> footprint, but not the message size, IF I'm not mis-understanding your idea
>>> of registering multiple assignment. More details:
>>>
>>> In the JoinGroupRequest, in the protocols field we can encode multiple
>>> protocols each with their different metadata. The coordinator will pick the
>>> common one that everyone supports (if there are no common one, it will send
>>> an error back; if there are multiple ones, it will pick the one with most
>>> votes, i.e. the one which was earlier in the encoded list). Since our
>>> current Streams rebalance protocol is still based on the consumer
>>> coordinator, it means our protocol_type would be "consumer", but instead
>>> the protocol type we can have multiple protocols like "streams",
>>> "streams_v2", "streams_v3" etc. The downside is that we need to implement a
>>> different assignor class for each version and register all of them in
>>> consumer's PARTITION_ASSIGNMENT_STRATEGY_CONFIG. In the future if we
>>> re-factor our implementation to have our own client coordinator layer like
>>> Connect did, we can simplify this part of the implementation. But even for
>>> now with the above approach this is still doable.
>>>
>>> On the broker side, the group coordinator will only persist a group with
>>> the selected protocol and its subscription metadata, e.g. if coordinator
>>> decides to pick "streams_v2" it will only sends that protocol's metadata
>>> from everyone to the leader to assign, AND when completing the rebalance it
>>> will also only write the group metadata with that protocol and the
>>> assignment only. In a word, although the network traffic maybe increased a
>>> bit, it would not be a bummer in our trade-off. One corner situation we
>>> need to consider is how to stop registering very old assignors to avoid the
>>> network traffic from increasing indefinitely, e.g. if you are rolling
>>> bounce from v2 to v3, then you'd not need to register v1 assignor anymore,
>>> but that would unfortunately still require some configs.
>>>
>>> 3) About the  "version probing" idea, I think that's a promising approach
>>> as well, but if we are going to do the multi-assignment its value seems
>>> subsumed? But I'm thinking maybe it can be added on top of multi-assignment
>>> to save us from still requiring the config to avoid registering all the
>>> metadata for all version. More details:
>>>
>>> In the JoinGroupRequest, we still register all the assignor but for all old
>>> assignors we do not encode any metadata, i.e. the encoded data would be:
>>>
>>> "streams_vN" : "encoded metadata"
>>> "streams_vN-1":empty
>>> "streams_vN-2":empty
>>> ..
>>> "streams_0":empty
>>>
>>> So the coordinator can still safely choose the latest common version; and
>>> then when leaders receive the subscription (note it should always recognize
>>> that version), let's say it is streams_vN-2, if one of the subscriptions
>>> are empty bytes, it will send the empty assignment with that version number
>>> encoded in the metadata. So in the second auto-triggered all members would
>>> send the metadata with that version:
>>>
>>> "streams_vN" : empty
>>> "streams_vN-1" : empty
>>> "streams_vN-2" : "encoded metadata"
>>> ..
>>> "streams_0":empty
>>>
>>>
>>> By doing this we would not require any configs for users.
>>>
>>>
>>> 4) About the "in_place" upgrade on rocksDB, I'm not clear about the details
>>> so probably we'd need to fill that out before making a call. For example,
>>> you mentioned "If we detect this situation, the Streams application closes
>>> corresponding active tasks as well as "hot standby" tasks, and re-creates
>>> the new active tasks using the new store." How could we guarantee that the
>>> gap between these two stores will keep decreasing than increasing so we'll
>>> eventually achieve the flip point? And also the longer we are before the
>>> flip point, the larger we are doubling the storage space, etc.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Sun, Mar 11, 2018 at 4:06 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> @John, Guozhang,
>>>>
>>>> thanks a lot for your comments. Very long reply...
>>>>
>>>>
>>>> About upgrading the rebalance metadata:
>>>>
>>>> Another possibility to do this, would be to register multiple assignment
>>>> strategies for the 1.2 applications. For this case, new instances would
>>>> be configured to support both and the broker would pick the version that
>>>> all instances understand. The disadvantage would be, that we send much
>>>> more data (ie, two subscriptions) in each rebalance as long as no second
>>>> rebalance is done disabling the old protocol. Thus, using this approach
>>>> would allow to avoid a second rebalance trading-off an increased
>>>> rebalance network footprint (I also assume that this would increase the
>>>> message size that is written into __consumer_offsets topic?). Overall, I
>>>> am not sure if this would be a good tradeoff, but it could avoid a
>>>> second rebalance (I have some more thoughts about stores below that are
>>>> relevant for single rebalance upgrade).
>>>>
>>>> For future upgrades we might be able to fix this though. I was thinking
>>>> about the following:
>>>>
>>>> In the current implementation, the leader fails if it gets a
>>>> subscription it does not understand (ie, newer version). We could change
>>>> this behavior and let the leader send an empty assignment plus error
>>>> code (including supported version) back to the instance sending the
>>>> "bad" subscription. This would allow the following logic for an
>>>> application instance:
>>>>
>>>>  - on startup, always send the latest subscription format
>>>>  - if leader understands it, we get an assignment back an start processing
>>>>  - if leader does not understand it, we get an empty assignment and
>>>> supported version back
>>>>  - the application unsubscribe()/subscribe()/poll() again and sends a
>>>> subscription using the leader's supported version
>>>>
>>>> This protocol would allow to do a single rolling bounce, and implements
>>>> a "version probing" step, that might result in two executed rebalances.
>>>> The advantage would be, that the user does not need to set any configs
>>>> or do multiple rolling bounces, as Streams takes care of this
>>>> automatically.
>>>>
>>>> One disadvantage would be, that two rebalances happen and that for an
>>>> error case during rebalance, we loose the information about the
>>>> supported leader version and the "probing step" would happen a second time.
>>>>
>>>> If the leader is eventually updated, it will include it's own supported
>>>> version in all assignments, to allow a "down graded" application to
>>>> upgrade its version later. Also, if a application fails, the first
>>>> probing would always be successful and only a single rebalance happens.
>>>> If we use this protocol, I think we don't need any configuration
>>>> parameter for future upgrades.
>>>>
>>>>
>>>> About "upgrade.from" vs "internal.protocol.version":
>>>>
>>>> Users would set "upgrade.from" to the release version the current/old
>>>> application is using. I think this is simpler, as users know this
>>>> version. If we use "internal.protocol.version" instead, we expose
>>>> implementation details and users need to know the protocol version (ie,
>>>> they need to map from the release version to the protocol version; ie,
>>>> "I am run 0.11.0 that runs with metadata protocol version 2").
>>>>
>>>> Also the KIP states that for the second rolling bounce, the
>>>> "upgrade.mode" config should be set back to `null` -- and thus,
>>>> "upgrade.from" would not have any effect and is ignored (I will update
>>>> the KIP to point out this dependency).
>>>>
>>>>
>>>>
>>>> About your second point: I'll update the KIP accordingly to describe
>>>> future updates as well. Both will be different.
>>>>
>>>>
>>>>
>>>> One more point about upgrading the store format. I was thinking about
>>>> avoiding the second rolling bounce all together in the future: (1) the
>>>> goal is to achieve an upgrade with zero downtime (2) this required to
>>>> prepare the stores as "hot standbys" before we do the switch and delete
>>>> the old stores. (3) the current proposal does the switch "globally" --
>>>> this is simpler and due to the required second rebalance no disadvantage.
>>>> However, a global consistent switch over might actually not be required.
>>>> For "in_place" upgrade, following the protocol from above, we could
>>>> decouple the store switch and each instance could switch its store
>>>> independently from all other instances. After the rolling bounce, it
>>>> seems to be ok to switch from the old store to the new store "under the
>>>> hood" whenever the new store is ready (this could even be done, before
>>>> we switch to the new metadata version). Each time we update the "hot
>>>> standby" we check if it reached the "endOffset"  (or maybe X% that could
>>>> either be hardcoded or configurable). If we detect this situation, the
>>>> Streams application closes corresponding active tasks as well as "hot
>>>> standby" tasks, and re-creates the new active tasks using the new store.
>>>> (I need to go through the details once again, but it seems to be
>>>> feasible.).
>>>>
>>>> Combining this strategy with the "multiple assignment" idea, might even
>>>> enable us to do an single rolling bounce upgrade from 1.1 -> 1.2.
>>>> Applications would just use the old store, as long as the new store is
>>>> not ready, even if the new metadata version is used already.
>>>>
>>>> For future upgrades, a single rebalance would be sufficient, too, even
>>>> if the stores are upgraded. We would not need any config parameters as
>>>> the "probe" step allows us to detect the supported rebalance metadata
>>>> version (and we would also not need multiple "assigmnent strategies" as
>>>> out own protocol encoded everything we need).
>>>>
>>>>
>>>> Let me know what you think.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 3/9/18 10:33 PM, Guozhang Wang wrote:
>>>>> @John:
>>>>>
>>>>> For the protocol version upgrade, it is only for the encoded metadata
>>>> bytes
>>>>> protocol, which are just bytes-in bytes-out from Consumer's pov, so I
>>>> think
>>>>> this change should be in the Streams layer as well.
>>>>>
>>>>> @Matthias:
>>>>>
>>>>> for 2), I agree that adding a "newest supported version" besides the
>>>>> "currently used version for encoding" is a good idea to allow either
>>>> case;
>>>>> the key is that in Streams we would likely end up with a mapping from the
>>>>> protocol version to the other persistent data format versions such as
>>>>> rocksDB, changelog. So with such a map we can actually achieve both
>>>>> scenarios, i.e. 1) one rolling bounce if the upgraded protocol version's
>>>>> corresponding data format does not change, e.g. 0.10.0 -> 0.10.1 leaders
>>>>> can choose to use the newer version in the first rolling bounce directly
>>>>> and we can document to users that they would not need to set
>>>>> "upgrade.mode", and 2) two rolling bounce if the upgraded protocol
>>>> version
>>>>> does indicate the data format changes, e.g. 1.1 -> 1.2, and then we can
>>>>> document that "upgrade.mode" needs to be set in the first rolling bounce
>>>>> and reset in the second.
>>>>>
>>>>>
>>>>> Besides that, some additional comments:
>>>>>
>>>>> 1) I still think "upgrade.from" is less intuitive for users to set than
>>>>> "internal.protocol.version" where for the latter users only need to set a
>>>>> single version, while the Streams will map that version to the Streams
>>>>> assignor's behavior as well as the data format. But maybe I did not get
>>>>> your idea about how the  "upgrade.from" config will be set, because in
>>>>> your Compatibility section how the upgrade.from config will be set for
>>>>> these two rolling bounces are not very clear: for example, should user
>>>>> reset it to null in the second rolling bounce?
>>>>>
>>>>> 2) In the upgrade path description, rather than talking about specific
>>>>> version 0.10.0 -> version 0.10.1 etc, can we just categorize all the
>>>>> possible scenarios, even for future upgrade versions, what should be the
>>>>> standard operations? The categorized we can summarize to would be
>>>> (assuming
>>>>> user upgrade from version X to version Y, where X and Y are Kafka
>>>> versions,
>>>>> with the corresponding supported protocol version x and y):
>>>>>
>>>>>
>>>>> a. x == y, i.e. metadata protocol does not change, and hence no
>>>> persistent
>>>>> data formats have changed.
>>>>>
>>>>> b. x != y, but all persistent data format remains the same.
>>>>>
>>>>> b. x !=y, AND some persistene data format like RocksDB format, changelog
>>>>> format, has been changed.
>>>>>
>>>>> c. special case: we may need some special handling logic when "current
>>>>> version" or "newest supported version" are not available in the protocol,
>>>>> i.e. for X as old as 0.10.0 and before 1.2.
>>>>>
>>>>>
>>>>> under the above scenarios, how many rolling bounces users need to
>>>> execute?
>>>>> how they should set the configs in each rolling bounce? and how Streams
>>>>> library will execute in these cases?
>>>>>
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 9, 2018 at 4:01 PM, Matthias J. Sax <matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Ted,
>>>>>>
>>>>>> I still consider changing the KIP to include it right away -- if not,
>>>>>> I'll create a JIRA. Need to think it through in more detail first.
>>>>>>
>>>>>> (Same for other open questions like interface names -- I collect
>>>>>> feedback and update the KIP after we reach consensus :))
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 3/9/18 3:35 PM, Ted Yu wrote:
>>>>>>> Thanks for the details, Matthias.
>>>>>>>
>>>>>>> bq. change the metadata protocol only if a future release, encoding
>>>> both
>>>>>> used
>>>>>>> and supported version might be an advantage
>>>>>>>
>>>>>>> Looks like encoding both versions wouldn't be implemented in this KIP.
>>>>>>>
>>>>>>> Please consider logging a JIRA with the encoding proposal.
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 2:27 PM, Matthias J. Sax <matth...@confluent.io
>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> @Bill: I think a filter predicate should be part of user code. And
>>>> even
>>>>>>>> if we want to add something like this, I would prefer to do it in a
>>>>>>>> separate KIP.
>>>>>>>>
>>>>>>>>
>>>>>>>> @James: I would love to avoid a second rolling bounce. But from my
>>>>>>>> understanding it would not be possible.
>>>>>>>>
>>>>>>>> The purpose of the second rolling bounce is indeed to switch from
>>>>>>>> version 2 to 3. It also has a second purpose, to switch from the old
>>>>>>>> store to the new store (this happens after the last instance bounces a
>>>>>>>> second time).
>>>>>>>>
>>>>>>>> The problem with one round of rolling bounces is, that it's unclear
>>>> when
>>>>>>>> to which from version 2 to version 3. The StreamsPartitionsAssignor is
>>>>>>>> stateless by design, and thus, the information which version it should
>>>>>>>> use must be passed in from externally -- and we want to use the
>>>>>>>> StreamsConfig to pass in this information.
>>>>>>>>
>>>>>>>> During upgrade, all new instanced have no information about the
>>>> progress
>>>>>>>> of the upgrade (ie, how many other instanced got upgrades already).
>>>>>>>> Therefore, it's not safe for them to send a version 3 subscription.
>>>> The
>>>>>>>> leader also has this limited view on the world and can only send
>>>> version
>>>>>>>> 2 assignments back.
>>>>>>>>
>>>>>>>> Thus, for the 1.2 upgrade, I don't think we can simplify the upgrade.
>>>>>>>>
>>>>>>>> We did consider to change the metadata to make later upgrades (ie,
>>>> from
>>>>>>>> 1.2 to 1.x) simpler though (for the case we change the metadata or
>>>>>>>> storage format again -- as long as we don't change it, a single
>>>> rolling
>>>>>>>> bounce is sufficient), by encoding "used version" and "supported
>>>>>>>> version". This would allow the leader to switch to the new version
>>>>>>>> earlier and without a second rebalance: leader would receive "used
>>>>>>>> version == old" and "supported version = old/new" -- as long as at
>>>> least
>>>>>>>> one instance sends a "supported version = old" leader sends old
>>>> version
>>>>>>>> assignment back. However, encoding both version would allow that the
>>>>>>>> leader can send a new version assignment back, right after the first
>>>>>>>> round or rebalance finished (all instances send "supported version =
>>>>>>>> new"). However, there are still two issues with this:
>>>>>>>>
>>>>>>>> 1) if we switch to the new format right after the last instance
>>>> bounced,
>>>>>>>> the new stores might not be ready to be used -- this could lead to
>>>>>>>> "downtime" as store must be restored before processing can resume.
>>>>>>>>
>>>>>>>> 2) Assume an instance fails and is restarted again. At this point, the
>>>>>>>> instance will still have "upgrade mode" enabled and thus sends the old
>>>>>>>> protocol data. However, it would be desirable to never fall back to
>>>> the
>>>>>>>> old protocol after the switch to the new protocol.
>>>>>>>>
>>>>>>>> The second issue is minor and I guess if users set-up the instance
>>>>>>>> properly it could be avoided. However, the first issue would prevent
>>>>>>>> "zero downtime" upgrades. Having said this, if we consider that we
>>>> might
>>>>>>>> change the metadata protocol only if a future release, encoding both
>>>>>>>> used and supported version might be an advantage in the future and we
>>>>>>>> could consider to add this information in 1.2 release to prepare for
>>>>>> this.
>>>>>>>>
>>>>>>>> Btw: monitoring the log, is also only required to give the instances
>>>>>>>> enough time to prepare the stores in new format. If you would do the
>>>>>>>> second rolling bounce before this, it would still work -- however, you
>>>>>>>> might see app "downtime" as the new store must be fully restored
>>>> before
>>>>>>>> processing can resume.
>>>>>>>>
>>>>>>>>
>>>>>>>> Does this make sense?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3/9/18 11:36 AM, James Cheng wrote:
>>>>>>>>> Matthias,
>>>>>>>>>
>>>>>>>>> For all the upgrade paths, is it possible to get rid of the 2nd
>>>> rolling
>>>>>>>> bounce?
>>>>>>>>>
>>>>>>>>> For the in-place upgrade, it seems like primary difference between
>>>> the
>>>>>>>> 1st rolling bounce and the 2nd rolling bounce is to decide whether to
>>>>>> send
>>>>>>>> Subscription Version 2 or Subscription Version 3.  (Actually, there is
>>>>>>>> another difference mentioned in that the KIP says that the 2nd rolling
>>>>>>>> bounce should happen after all new state stores are created by the
>>>>>>>> background thread. However, within the 2nd rolling bounce, we say that
>>>>>>>> there is still a background thread, so it seems like is no actual
>>>>>>>> requirement to wait for the new state stores to be created.)
>>>>>>>>>
>>>>>>>>> The 2nd rolling bounce already knows how to deal with mixed-mode
>>>>>> (having
>>>>>>>> both Version 2 and Version 3 in the same consumer group). It seems
>>>> like
>>>>>> we
>>>>>>>> could get rid of the 2nd bounce if we added logic (somehow/somewhere)
>>>>>> such
>>>>>>>> that:
>>>>>>>>> * Instances send Subscription Version 2 until all instances are
>>>> running
>>>>>>>> the new code.
>>>>>>>>> * Once all the instances are running the new code, then one at a
>>>> time,
>>>>>>>> the instances start sending Subscription V3. Leader still hands out
>>>>>>>> Assignment Version 2, until all new state stores are ready.
>>>>>>>>> * Once all instances report that new stores are ready, Leader sends
>>>> out
>>>>>>>> Assignment Version 3.
>>>>>>>>> * Once an instance receives an Assignment Version 3, it can delete
>>>> the
>>>>>>>> old state store.
>>>>>>>>>
>>>>>>>>> Doing it that way seems like it would reduce a lot of
>>>>>>>> operator/deployment overhead. No need to do 2 rolling restarts. No
>>>> need
>>>>>> to
>>>>>>>> monitor logs for state store rebuild. You just deploy it, and the
>>>>>> instances
>>>>>>>> update themselves.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>> The thing that made me think of this is that the "2 rolling bounces"
>>>> is
>>>>>>>> similar to what Kafka brokers have to do changes in
>>>>>>>> inter.broker.protocol.version and log.message.format.version. And in
>>>> the
>>>>>>>> broker case, it seems like it would be possible (with some work of
>>>>>> course)
>>>>>>>> to modify kafka to allow us to do similar auto-detection of broker
>>>>>>>> capabilities and automatically do a switchover from old/new versions.
>>>>>>>>>
>>>>>>>>> -James
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Mar 9, 2018, at 10:38 AM, Bill Bejeck <bbej...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Matthias,
>>>>>>>>>>
>>>>>>>>>> Thanks for the KIP, it's a +1 from me.
>>>>>>>>>>
>>>>>>>>>> I do have one question regarding the retrieval methods on the new
>>>>>>>>>> interfaces.
>>>>>>>>>>
>>>>>>>>>> Would want to consider adding one method with a Predicate that would
>>>>>>>> allow
>>>>>>>>>> for filtering records by the timestamp stored with the record?  Or
>>>> is
>>>>>>>> this
>>>>>>>>>> better left for users to implement themselves once the data has been
>>>>>>>>>> retrieved?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Bill
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 8, 2018 at 7:14 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Matthias:
>>>>>>>>>>> For my point #1, I don't have preference as to which separator is
>>>>>>>> chosen.
>>>>>>>>>>> Given the background you mentioned, current choice is good.
>>>>>>>>>>>
>>>>>>>>>>> For #2, I think my proposal is better since it is closer to English
>>>>>>>>>>> grammar.
>>>>>>>>>>>
>>>>>>>>>>> Would be good to listen to what other people think.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>>
>>>>>>>>>>>> @Guozhang:
>>>>>>>>>>>>
>>>>>>>>>>>> So far, there is one PR for the rebalance metadata upgrade fix
>>>>>>>>>>>> (addressing the mentioned
>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first
>>>>>>>>>>>> impression how the metadata upgrade works including a system test:
>>>>>>>>>>>> https://github.com/apache/kafka/pull/4636
>>>>>>>>>>>>
>>>>>>>>>>>> I can share other PRs as soon as they are ready. I agree that the
>>>>>> KIP
>>>>>>>> is
>>>>>>>>>>>> complex am I ok with putting out more code to give better
>>>> discussion
>>>>>>>>>>>> context.
>>>>>>>>>>>>
>>>>>>>>>>>> @Ted:
>>>>>>>>>>>>
>>>>>>>>>>>> I picked `_` instead of `-` to align with the
>>>> `processing.guarantee`
>>>>>>>>>>>> parameter that accepts `at_least_one` and `exactly_once` as
>>>> values.
>>>>>>>>>>>> Personally, I don't care about underscore vs dash but I prefer
>>>>>>>>>>>> consistency. If you feel strong about it, we can also change it to
>>>>>>>> `-`.
>>>>>>>>>>>>
>>>>>>>>>>>> About the interface name: I am fine either way -- I stripped the
>>>>>>>> `With`
>>>>>>>>>>>> to keep the name a little shorter. Would be good to get feedback
>>>>>> from
>>>>>>>>>>>> others and pick the name the majority prefers.
>>>>>>>>>>>>
>>>>>>>>>>>> @John:
>>>>>>>>>>>>
>>>>>>>>>>>> We can certainly change it. I agree that it would not make a
>>>>>>>> difference.
>>>>>>>>>>>> I'll dig into the code to see if any of the two version might
>>>>>>>> introduce
>>>>>>>>>>>> undesired complexity and update the KIP if I don't hit an issue
>>>> with
>>>>>>>>>>>> putting the `-v2` to the store directory instead of `rocksdb-v2`
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3/8/18 2:44 PM, John Roesler wrote:
>>>>>>>>>>>>> Hey Matthias,
>>>>>>>>>>>>>
>>>>>>>>>>>>> The KIP looks good to me. I had several questions queued up, but
>>>>>> they
>>>>>>>>>>>> were
>>>>>>>>>>>>> all in the "rejected alternatives" section... oh, well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> One very minor thought re changing the state directory from
>>>>>>>>>>>> "/<state.dir>/<
>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName/" to "/<state.dir>/<
>>>>>>>>>>>>> application.id>/<task.id>/rocksdb-v2/storeName/": if you put the
>>>>>>>> "v2"
>>>>>>>>>>>>> marker on the storeName part of the path (i.e., "/<state.dir>/<
>>>>>>>>>>>>> application.id>/<task.id>/rocksdb/storeName-v2/"), then you get
>>>>>> the
>>>>>>>>>>> same
>>>>>>>>>>>>> benefits without altering the high-level directory structure.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It may not matter, but I could imagine people running scripts to
>>>>>>>>>>> monitor
>>>>>>>>>>>>> rocksdb disk usage for each task, or other such use cases.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Matthias:
>>>>>>>>>>>>>> Nicely written KIP.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> "in_place" : can this be "in-place" ? Underscore may sometimes
>>>> be
>>>>>>>> miss
>>>>>>>>>>>>>> typed (as '-'). I think using '-' is more friendly to user.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public interface ReadOnlyKeyValueTimestampStore<K, V> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the
>>>> class ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang <
>>>> wangg...@gmail.com
>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello Matthias, thanks for the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I've read through the upgrade patch section and it looks good
>>>> to
>>>>>>>> me,
>>>>>>>>>>> if
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> already have a WIP PR for it could you also share it here so
>>>> that
>>>>>>>>>>>> people
>>>>>>>>>>>>>>> can take a look?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm +1 on the KIP itself. But large KIPs like this there are
>>>>>> always
>>>>>>>>>>>> some
>>>>>>>>>>>>>>> devil hidden in the details, so I think it is better to have
>>>> the
>>>>>>>>>>>>>>> implementation in parallel along with the design discussion :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax <
>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I want to propose KIP-258 for the Streams API to allow storing
>>>>>>>>>>>>>>>> timestamps in RocksDB. This feature is the basis to resolve
>>>>>>>> multiple
>>>>>>>>>>>>>>>> tickets (issues and feature requests).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Looking forward to your comments about this!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to