I think this makes sense. When we introduce this new state, we might also tackle the jira a mentioned. If there is a global thread, on startup of a `KafakStreams` client we should not transit to `REBALANCING` but to the new state, and maybe also make the "bootstrapping" non-blocking.
I guess it's worth to mention this in the KIP. Btw: The new state for KafkaStreams should also be part of the KIP as it is a public API change, too. -Matthias On 8/29/20 9:37 AM, John Roesler wrote: > Hi Navinder, > > Thanks for the ping. Yes, that all sounds right to me. The name > “RESTORING_GLOBAL” sounds fine, too. > > I think as far as warnings go, we’d just propose to mention it in the javadoc > of the relevant methods that the given topics should be compacted. > > Thanks! > -John > > On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote: >> Gentle ping. >> >> ~ Navinder >> On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar >> <navinder_b...@yahoo.com.invalid> wrote: >> >> >> Thanks Matthias & John, >> >> >> >> I am glad we are converging towards an understanding. So, to summarize, >> >> we will still keep treating this change in KIP and instead of providing a >> reset >> >> strategy, we will cleanup, and reset to earliest and build the state. >> >> When we hit the exception and we are building the state, we will stop all >> >> processing and change the state of KafkaStreams to something like >> >> “RESTORING_GLOBAL” or the like. >> >> >> >> How do we plan to educate users on the non desired effects of using >> >> non-compacted global topics? (via the KIP itself?) >> >> >> +1 on changing the KTable behavior, reset policy for global, connecting >> processors to global for a later stage when demanded. >> >> Regards, >> Navinder >> On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax >> <mj...@apache.org> wrote: >> >> Your observation is correct. Connecting (regular) stores to processors >> is necessary to "merge" sub-topologies into single ones if a store is >> shared. -- For global stores, the structure of the program does not >> change and thus connecting srocessors to global stores is not required. >> >> Also given our experience with restoring regular state stores (ie, >> partial processing of task that don't need restore), it seems better to >> pause processing and move all CPU and network resources to the global >> thread to rebuild the global store as soon as possible instead of >> potentially slowing down the restore in order to make progress on some >> tasks. >> >> Of course, if we collect real world experience and it becomes an issue, >> we could still try to change it? >> >> >> -Matthias >> >> >> On 8/18/20 3:31 PM, John Roesler wrote: >>> Thanks Matthias, >>> >>> Sounds good. I'm on board with no public API change and just >>> recovering instead of crashing. >>> >>> Also, to be clear, I wouldn't drag KTables into it; I was >>> just trying to wrap my head around the congruity of our >>> choice for GlobalKTable with respect to KTable. >>> >>> I agree that whatever we decide to do would probably also >>> resolve KAFKA-7380. >>> >>> Moving on to discuss the behavior change, I'm wondering if >>> we really need to block all the StreamThreads. It seems like >>> we only need to prevent processing on any task that's >>> connected to the GlobalStore. >>> >>> I just took a look at the topology building code, and it >>> actually seems that connections to global stores don't need >>> to be declared. That's a bummer, since it means that we >>> really do have to stop all processing while the global >>> thread catches up. >>> >>> Changing this seems like it'd be out of scope right now, but >>> I bring it up in case I'm wrong and it actually is possible >>> to know which specific tasks need to be synchronized with >>> which global state stores. If we could know that, then we'd >>> only have to block some of the tasks, not all of the >>> threads. >>> >>> Thanks, >>> -John >>> >>> >>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote: >>>> Thanks for the discussion. >>>> >>>> I agree that this KIP is justified in any case -- even if we don't >>>> change public API, as the change in behavior is significant. >>>> >>>> A better documentation for cleanup policy is always good (even if I am >>>> not aware of any concrete complaints atm that users were not aware of >>>> the implications). Of course, for a regular KTable, one can >>>> enable/disable the source-topic-changelog optimization and thus can use >>>> a non-compacted topic for this case, what is quite a difference to >>>> global stores/tables; so maybe it's worth to point out this difference >>>> explicitly. >>>> >>>> As mentioned before, the main purpose of the original Jira was to avoid >>>> the crash situation but to allow for auto-recovering while it was an >>>> open question if it makes sense / would be useful to allow users to >>>> specify a custom reset policy instead of using a hard-coded "earliest" >>>> strategy. -- It seem it's still unclear if it would be useful and thus >>>> it might be best to not add it for now -- we can still add it later if >>>> there are concrete use-cases that need this feature. >>>> >>>> @John: I actually agree that it's also questionable to allow a custom >>>> reset policy for KTables... Not sure if we want to drag this question >>>> into this KIP though? >>>> >>>> So it seem, we all agree that we actually don't need any public API >>>> changes, but we only want to avoid crashing? >>>> >>>> For this case, to preserve the current behavior that guarantees that the >>>> global store/table is always loaded first, it seems we need to have a >>>> stop-the-world mechanism for the main `StreamThreads` for this case -- >>>> do we need to add a new state to KafkaStreams client for this case? >>>> >>>> Having a new state might also be helpful for >>>> https://issues.apache.org/jira/browse/KAFKA-7380 ? >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> >>>> On 8/17/20 7:34 AM, John Roesler wrote: >>>>> Hi Navinder, >>>>> >>>>> I see what you mean about the global consumer being similar >>>>> to the restore consumer. >>>>> >>>>> I also agree that automatically performing the recovery >>>>> steps should be strictly an improvement over the current >>>>> situation. >>>>> >>>>> Also, yes, it would be a good idea to make it clear that the >>>>> global topic should be compacted in order to ensure correct >>>>> semantics. It's the same way with input topics for KTables; >>>>> we rely on users to ensure the topics are compacted, and if >>>>> they aren't, then the execution semantics will be broken. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote: >>>>>> Hi John, >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Thanks for your inputs. Since, global topics are in a way their own >>>>>> changelog, wouldn’t the global consumers be more akin to restore >>>>>> consumers than the main consumer? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I am also +1 on catching the exception and setting it to the earliest >>>>>> for now. Whenever an instance starts, currently global stream thread(if >>>>>> available) goes to RUNNING before stream threads are started so that >>>>>> means the global state is available when the processing by stream >>>>>> threads start. So, with the new change of catching the exception, >>>>>> cleaning store and resetting to earlier would probably be “stop the >>>>>> world” as you said John, as I think we will have to pause the stream >>>>>> threads till the whole global state is recovered. I assume it is "stop >>>>>> the world" right now as well, since now also if an >>>>>> InvalidOffsetException comes, we throw streams exception and the user >>>>>> has to clean up and handle all this manually and when that instance will >>>>>> start, it will restore global state first. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I had an additional thought to this whole problem, would it be helpful >>>>>> to educate the users that global topics should have cleanup policy as >>>>>> compact, so that this invalid offset exception never arises for them. >>>>>> Assume for example, that the cleanup policy in global topic is "delete" >>>>>> and it has deleted k1, k2 keys(via retention.ms) although all the >>>>>> instances had already consumed them so they are in all global stores and >>>>>> all other instances are up to date on the global data(so no >>>>>> InvalidOffsetException). Now, a new instance is added to the cluster, >>>>>> and we have already lost k1, k2 from the global topic so it will start >>>>>> consuming from the earliest point in the global topic. So, wouldn’t this >>>>>> global store on the new instance has 2 keys less than all the other >>>>>> global stores already available in the cluster? Please let me know if I >>>>>> am missing something. Thanks. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Navinder >>>>>> >>>>>> >>>>>> On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler >>>>>> <vvcep...@apache.org> wrote: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> It seems like the main motivation for this proposal is satisfied if we >>>>>> just implement some recovery mechanism instead of crashing. If the >>>>>> mechanism is going to be pausing all the threads until the state is >>>>>> recovered, then it still seems like a big enough behavior change to >>>>>> warrant a KIP still. >>>>>> >>>>>> I have to confess I’m a little unclear on why a custom reset policy for >>>>>> a global store, table, or even consumer might be considered wrong. It’s >>>>>> clearly wrong for the restore consumer, but the global consumer seems >>>>>> more semantically akin to the main consumer than the restore consumer. >>>>>> >>>>>> In other words, if it’s wrong to reset a GlobalKTable from latest, >>>>>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It >>>>>> certainly seems like it would be an odd choice, but I’ve seen many >>>>>> choices I thought were odd turn out to have perfectly reasonable use >>>>>> cases. >>>>>> >>>>>> As far as the PAPI global store goes, I could see adding the option to >>>>>> configure it, since as Matthias pointed out, there’s really no specific >>>>>> semantics for the PAPI. But if automatic recovery is really all Navinder >>>>>> wanted, the I could also see deferring this until someone specifically >>>>>> wants it. >>>>>> >>>>>> So the tl;dr is, if we just want to catch the exception and rebuild the >>>>>> store by seeking to earliest with no config or API changes, then I’m +1. >>>>>> >>>>>> I’m wondering if we can improve on the “stop the world” effect of >>>>>> rebuilding the global store, though. It seems like we could put our >>>>>> heads together and come up with a more fine-grained approach to >>>>>> maintaining the right semantics during recovery while still making some >>>>>> progress. >>>>>> >>>>>> Thanks, >>>>>> John >>>>>> >>>>>> >>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>> IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is >>>>>>> not as straightforward >>>>>>> as it seems and it might change the existing behavior for users without >>>>>>> they releasing it, I also >>>>>>> >>>>>>> think that we should change the behavior inside global stream thread to >>>>>>> not die on >>>>>>> >>>>>>> InvalidOffsetException and instead clean and rebuild the state from the >>>>>>> earliest. On this, as you >>>>>>> >>>>>>> mentioned that we would need to pause the stream threads till the >>>>>>> global store is completely restored. >>>>>>> >>>>>>> Without it, there will be incorrect processing results if they are >>>>>>> utilizing a global store during processing. >>>>>>> >>>>>>> >>>>>>> >>>>>>> So, basically we can divide the use-cases into 4 parts. >>>>>>> >>>>>>> - PAPI based global stores (will have the earliest hardcoded) >>>>>>> - PAPI based state stores (already has auto.reset.config) >>>>>>> - DSL based GlobalKTables (will have earliest hardcoded) >>>>>>> - DSL based KTables (will continue with auto.reset.config) >>>>>>> >>>>>>> >>>>>>> >>>>>>> So, this would mean that we are not changing any existing behaviors >>>>>>> with this if I am right. >>>>>>> >>>>>>> >>>>>>> >>>>>>> I guess we could improve the code to actually log a warning for this >>>>>>> >>>>>>> case, similar to what we do for some configs already (cf >>>>>>> >>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>> >>>>>>>>> I like this idea. In case we go ahead with the above approach and if >>>>>>>>> we can’t >>>>>>> >>>>>>> deprecate it, we should educate users that this config doesn’t work. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Looking forward to hearing thoughts from others as well. >>>>>>> >>>>>>> >>>>>>> - Navinder On Tuesday, 4 August, 2020, 05:07:59 am IST, Matthias J. >>>>>>> Sax <mj...@apache.org> wrote: >>>>>>> >>>>>>> Navinder, >>>>>>> >>>>>>> thanks for updating the KIP. I think the motivation section is not >>>>>>> totally accurate (what is not your fault though, as the history of how >>>>>>> we handle this case is intertwined...) For example, "auto.offset.reset" >>>>>>> is hard-coded for the global consumer to "none" and using >>>>>>> "global.consumer.auto.offset.reset" has no effect (cf >>>>>>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values) >>>>>>> >>>>>>> Also, we could not even really deprecate the config as mentioned in >>>>>>> rejected alternatives sections, because we need `auto.offset.reset` for >>>>>>> the main consumer -- and adding a prefix is independent of it. Also, >>>>>>> because we ignore the config, it's is also deprecated/removed if you >>>>>>> wish. >>>>>>> >>>>>>> I guess we could improve the code to actually log a warning for this >>>>>>> case, similar to what we do for some configs already (cf >>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>> >>>>>>> >>>>>>> The other question is about compatibility with regard to default >>>>>>> behavior: if we want to reintroduce `global.consumer.auto.offset.reset` >>>>>>> this basically implies that we need to respect `auto.offset.reset`, too. >>>>>>> Remember, that any config without prefix is applied to all clients that >>>>>>> support this config. Thus, if a user does not limit the scope of the >>>>>>> config to the main consumer (via `main.consumer.auto.offset.reset`) but >>>>>>> uses the non-prefix versions and sets it to "latest" (and relies on the >>>>>>> current behavior that `auto.offset.reset` is "none", and effectively >>>>>>> "earliest" on the global consumer), the user might end up with a >>>>>>> surprise as the global consumer behavior would switch from "earliest" to >>>>>>> "latest" (most likely unintentionally). Bottom line is, that users might >>>>>>> need to change configs to preserve the old behavior... >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> However, before we discuss those details, I think we should discuss the >>>>>>> topic in a broader context first: >>>>>>> >>>>>>> - for a GlobalKTable, does it even make sense from a correctness point >>>>>>> of view, to allow users to set a custom reset policy? It seems you >>>>>>> currently don't propose this in the KIP, but as you don't mention it >>>>>>> explicitly it's unclear if that on purpose of an oversight? >>>>>>> >>>>>>> - Should we treat global stores differently to GlobalKTables and allow >>>>>>> for more flexibility (as the PAPI does not really provide any semantic >>>>>>> contract). It seems that is what you propose in the KIP. We should >>>>>>> discuss if this flexibility does make sense or not for the PAPI, or if >>>>>>> we should apply the same reasoning about correctness we use for KTables >>>>>>> to global stores? To what extend are/should they be different? >>>>>>> >>>>>>> - If we support auto.offset.reset for global store, how should we >>>>>>> handle the initial bootstrapping of the store/table (that is hard-coded >>>>>>> atm)? Should we skip it if the policy is "latest" and start with an >>>>>>> empty state? Note that we did consider this behavior incorrect via >>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am wondering >>>>>>> why should we change it back again? >>>>>>> >>>>>>> >>>>>>> Finally, the main motivation for the Jira ticket was to let the runtime >>>>>>> auto-recover instead of dying as it does currently. If we decide that a >>>>>>> custom reset policy does actually not make sense, we can just change the >>>>>>> global-thread to not die any longer on an `InvalidOffsetException` but >>>>>>> rebuild the state automatically. This would be "only" a behavior change >>>>>>> but does not require any public API changes. -- For this case, we should >>>>>>> also think about the synchronization with the main processing threads? >>>>>>> On startup we bootstrap the global stores before processing happens. >>>>>>> Thus, if an `InvalidOffsetException` happen and the global thread dies, >>>>>>> the main threads cannot access the global stores any longer an also die. >>>>>>> If we re-build the state though, do we need to pause the main thread >>>>>>> during this phase? >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote: >>>>>>>> Hi John, >>>>>>>> >>>>>>>> I have updated the KIP to make the motivation more clear. In a >>>>>>>> nutshell, we will use the already existing config >>>>>>>> "global.consumer.auto.offset.reset" for users to set a blanket reset >>>>>>>> policy for all global topics and add a new interface to set per-topic >>>>>>>> reset policy for each global topic(for which we specifically need this >>>>>>>> KIP). There was a point raised from Matthias above to always reset to >>>>>>>> earliest by cleaning the stores and seekToBeginning in case of >>>>>>>> InvalidOffsetException. We can go with that route as well and I don't >>>>>>>> think it would need a KIP as if we are not providing users an option >>>>>>>> to have blanket reset policy on global topics, then a per-topic >>>>>>>> override would also not be required(the KIP is required basically for >>>>>>>> that). Although, I think if users have an option to choose reset >>>>>>>> policy for StreamThread then the option should be provided for >>>>>>>> GlobalStreamThread as well and if we don't want to use the >>>>>>>> "global.consumer.auto.offset.reset" then we would need to deprecate it >>>>>>>> because currently it's not serving any purpose. For now, I have added >>>>>>>> it in rejected alternatives but we can discuss this. >>>>>>>> >>>>>>>> On the query that I had for Guozhang, thanks to Matthias we have fixed >>>>>>>> it last week as part of KAFKA-10306. >>>>>>>> >>>>>>>> ~Navinder >>>>>>>> >>>>>>>> >>>>>>>> On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar >>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Sorry, it took some time to respond back. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> “but I thought we would pass the config through to the client.” >>>>>>>> >>>>>>>>>> @John, sure we can use the config in GloablStreamThread, that could >>>>>>>>>> be one of the way to solve it. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> @Matthias, sure cleaning the store and recreating is one way but since >>>>>>>> we are giving an option to reset in StreamThread why the >>>>>>>> implementation should be different in GlobalStreamThread. I think we >>>>>>>> should use the global.consumer.auto.offset.reset config to accept the >>>>>>>> reset strategy opted by the user although I would be ok with just >>>>>>>> cleaning and resetting to the latest as well for now. Currently, we >>>>>>>> throw a StreamsException in case of InvalidOffsetException in >>>>>>>> GlobalStreamThread so just resetting would still be better than what >>>>>>>> happens currently. >>>>>>>> >>>>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* >>>>>>>> Note that {@link GlobalKTable} always applies {@code >>>>>>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the >>>>>>>> specified value in {@link StreamsConfig} or {@link Consumed}.’ >>>>>>>> So, I guess we are already cleaning up and recreating for GlobalKTable >>>>>>>> from earliest offset. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> @Guozhan while looking at the code, I also noticed a TODO: pending in >>>>>>>> GlobalStateManagerImpl, when InvalidOffsetException is thrown. >>>>>>>> Earlier, we were directly clearing the store here and recreating from >>>>>>>> scratch but that code piece is removed now. Are you working on a >>>>>>>> follow-up PR for this or just handling the reset in GlobalStreamThread >>>>>>>> should be sufficient? >>>>>>>> >>>>>>>> Regards, >>>>>>>> Navinder >>>>>>>> >>>>>>>> On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax >>>>>>>> <mj...@apache.org> wrote: >>>>>>>> >>>>>>>> Atm, the config should be ignored and the global-consumer should use >>>>>>>> "none" in a hard-coded way. >>>>>>>> >>>>>>>> However, if am still wondering if we actually want/need to allow users >>>>>>>> to specify the reset policy? It might be worth to consider, to just >>>>>>>> change the behavior: catch the exception, log an ERROR (for information >>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate the store? >>>>>>>> >>>>>>>> Btw: if we want to allow users to set the reset policy, this should be >>>>>>>> possible via the config, or via overwriting the config in the method >>>>>>>> itself. Thus, we would need to add the new overloaded method to >>>>>>>> `Topology` and `StreamsBuilder`. >>>>>>>> >>>>>>>> Another question to ask: what about GlobalKTables? Should they behave >>>>>>>> the same? An alternative design could be, to allow users to specify a >>>>>>>> flexible reset policy for global-stores, but not for GlobalKTables and >>>>>>>> use the strategy suggested above for this case. >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote: >>>>>>>>> Hi Navinder, >>>>>>>>> >>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... You said we >>>>>>>>> are not currently using the config, but I thought we would pass the >>>>>>>>> config through to the client. Can you confirm whether or not the >>>>>>>>> existing config works for your use case? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> John >>>>>>>>> >>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote: >>>>>>>>>> Sorry my bad. Found it. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs for >>>>>>>>>> the >>>>>>>>>> global consumer client from >>>>>>>>>> >>>>>>>>>> * the general consumer client configs. The override precedence is >>>>>>>>>> the >>>>>>>>>> following (from highest to lowest precedence): >>>>>>>>>> * 1. global.consumer.[config-name].. >>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = >>>>>>>>>> "global.consumer."; >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> So, that's great. We already have a config exposed to reset offsets >>>>>>>>>> for >>>>>>>>>> global topics via global.consumer.auto.offset.reset just that we are >>>>>>>>>> not actually using it inside GlobalStreamThread to reset. >>>>>>>>>> >>>>>>>>>> -Navinder >>>>>>>>>> On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar >>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>> >>>>>>>>>> Hi John, >>>>>>>>>> >>>>>>>>>> Thanks for your feedback. >>>>>>>>>> 1. I think there is some confusion on my first point, the enum I am >>>>>>>>>> sure we can use the same one but the external config which controls >>>>>>>>>> the >>>>>>>>>> resetting in global stream thread either we can the same one which >>>>>>>>>> users use for source topics(StreamThread) or we can provide a new >>>>>>>>>> one >>>>>>>>>> which specifically controls global topics. For e.g. currently if I >>>>>>>>>> get >>>>>>>>>> an InvalidOffsetException in any of my source topics, I can choose >>>>>>>>>> whether to reset from Earliest or Latest(with auto.offset.reset). >>>>>>>>>> Now >>>>>>>>>> either we can use the same option and say if I get the same >>>>>>>>>> exception >>>>>>>>>> for global topics I will follow same resetting. Or some users might >>>>>>>>>> want to have totally different setting for both source and global >>>>>>>>>> topics, like for source topic I want resetting from Latest but for >>>>>>>>>> global topics I want resetting from Earliest so in that case adding >>>>>>>>>> a >>>>>>>>>> new config might be better. >>>>>>>>>> >>>>>>>>>> 2. I couldn't find this config currently >>>>>>>>>> "global.consumer.auto.offset.reset". Infact in >>>>>>>>>> GlobalStreamThread.java >>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException and >>>>>>>>>> there >>>>>>>>>> is a test as >>>>>>>>>> well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so >>>>>>>>>> I >>>>>>>>>> think this is the config we are trying to introduce with this KIP. >>>>>>>>>> >>>>>>>>>> -Navinder On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler >>>>>>>>>> <j...@vvcephei.org> wrote: >>>>>>>>>> >>>>>>>>>> Hi Navinder, >>>>>>>>>> >>>>>>>>>> Thanks for this proposal! >>>>>>>>>> >>>>>>>>>> Regarding your question about whether to use the same policy >>>>>>>>>> enum or not, the underlying mechanism is the same, so I think >>>>>>>>>> we can just use the same AutoOffsetReset enum. >>>>>>>>>> >>>>>>>>>> Can you confirm whether setting the reset policy config on the >>>>>>>>>> global consumer currently works or not? Based on my reading >>>>>>>>>> of StreamsConfig, it looks like it would be: >>>>>>>>>> "global.consumer.auto.offset.reset". >>>>>>>>>> >>>>>>>>>> If that does work, would you still propose to augment the >>>>>>>>>> Java API? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> -John >>>>>>>>>> >>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote: >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> KIP: >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy >>>>>>>>>>> >>>>>>>>>>> I have taken over this KIP since it has been dormant for a long >>>>>>>>>>> time >>>>>>>>>>> and this looks important for use-cases that have large global data, >>>>>>>>>>> so >>>>>>>>>>> rebuilding global stores from scratch might seem overkill in case >>>>>>>>>>> of >>>>>>>>>>> InvalidOffsetExecption. >>>>>>>>>>> >>>>>>>>>>> We want to give users the control to use reset policy(as we do in >>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have still not >>>>>>>>>>> decided whether to restrict this option to the same reset policy >>>>>>>>>>> being >>>>>>>>>>> used by StreamThread(using auto.offset.reset config) or add another >>>>>>>>>>> reset config specifically for global stores >>>>>>>>>>> "global.auto.offset.reset" which gives users more control to choose >>>>>>>>>>> separate policies for global and stream threads. >>>>>>>>>>> >>>>>>>>>>> I would like to hear your opinions on the KIP. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Navinder >>>>>>>> >>>>>>>> >>>>>>> >>> >>
signature.asc
Description: OpenPGP digital signature