Hi Navinder, Thanks for the ping. Yes, that all sounds right to me. The name “RESTORING_GLOBAL” sounds fine, too.
I think as far as warnings go, we’d just propose to mention it in the javadoc of the relevant methods that the given topics should be compacted. Thanks! -John On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote: > Gentle ping. > > ~ Navinder > On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar > <navinder_b...@yahoo.com.invalid> wrote: > > > Thanks Matthias & John, > > > > I am glad we are converging towards an understanding. So, to summarize, > > we will still keep treating this change in KIP and instead of providing a > reset > > strategy, we will cleanup, and reset to earliest and build the state. > > When we hit the exception and we are building the state, we will stop all > > processing and change the state of KafkaStreams to something like > > “RESTORING_GLOBAL” or the like. > > > > How do we plan to educate users on the non desired effects of using > > non-compacted global topics? (via the KIP itself?) > > > +1 on changing the KTable behavior, reset policy for global, connecting > processors to global for a later stage when demanded. > > Regards, > Navinder > On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax > <mj...@apache.org> wrote: > > Your observation is correct. Connecting (regular) stores to processors > is necessary to "merge" sub-topologies into single ones if a store is > shared. -- For global stores, the structure of the program does not > change and thus connecting srocessors to global stores is not required. > > Also given our experience with restoring regular state stores (ie, > partial processing of task that don't need restore), it seems better to > pause processing and move all CPU and network resources to the global > thread to rebuild the global store as soon as possible instead of > potentially slowing down the restore in order to make progress on some > tasks. > > Of course, if we collect real world experience and it becomes an issue, > we could still try to change it? > > > -Matthias > > > On 8/18/20 3:31 PM, John Roesler wrote: > > Thanks Matthias, > > > > Sounds good. I'm on board with no public API change and just > > recovering instead of crashing. > > > > Also, to be clear, I wouldn't drag KTables into it; I was > > just trying to wrap my head around the congruity of our > > choice for GlobalKTable with respect to KTable. > > > > I agree that whatever we decide to do would probably also > > resolve KAFKA-7380. > > > > Moving on to discuss the behavior change, I'm wondering if > > we really need to block all the StreamThreads. It seems like > > we only need to prevent processing on any task that's > > connected to the GlobalStore. > > > > I just took a look at the topology building code, and it > > actually seems that connections to global stores don't need > > to be declared. That's a bummer, since it means that we > > really do have to stop all processing while the global > > thread catches up. > > > > Changing this seems like it'd be out of scope right now, but > > I bring it up in case I'm wrong and it actually is possible > > to know which specific tasks need to be synchronized with > > which global state stores. If we could know that, then we'd > > only have to block some of the tasks, not all of the > > threads. > > > > Thanks, > > -John > > > > > > On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote: > >> Thanks for the discussion. > >> > >> I agree that this KIP is justified in any case -- even if we don't > >> change public API, as the change in behavior is significant. > >> > >> A better documentation for cleanup policy is always good (even if I am > >> not aware of any concrete complaints atm that users were not aware of > >> the implications). Of course, for a regular KTable, one can > >> enable/disable the source-topic-changelog optimization and thus can use > >> a non-compacted topic for this case, what is quite a difference to > >> global stores/tables; so maybe it's worth to point out this difference > >> explicitly. > >> > >> As mentioned before, the main purpose of the original Jira was to avoid > >> the crash situation but to allow for auto-recovering while it was an > >> open question if it makes sense / would be useful to allow users to > >> specify a custom reset policy instead of using a hard-coded "earliest" > >> strategy. -- It seem it's still unclear if it would be useful and thus > >> it might be best to not add it for now -- we can still add it later if > >> there are concrete use-cases that need this feature. > >> > >> @John: I actually agree that it's also questionable to allow a custom > >> reset policy for KTables... Not sure if we want to drag this question > >> into this KIP though? > >> > >> So it seem, we all agree that we actually don't need any public API > >> changes, but we only want to avoid crashing? > >> > >> For this case, to preserve the current behavior that guarantees that the > >> global store/table is always loaded first, it seems we need to have a > >> stop-the-world mechanism for the main `StreamThreads` for this case -- > >> do we need to add a new state to KafkaStreams client for this case? > >> > >> Having a new state might also be helpful for > >> https://issues.apache.org/jira/browse/KAFKA-7380 ? > >> > >> > >> > >> -Matthias > >> > >> > >> > >> > >> On 8/17/20 7:34 AM, John Roesler wrote: > >>> Hi Navinder, > >>> > >>> I see what you mean about the global consumer being similar > >>> to the restore consumer. > >>> > >>> I also agree that automatically performing the recovery > >>> steps should be strictly an improvement over the current > >>> situation. > >>> > >>> Also, yes, it would be a good idea to make it clear that the > >>> global topic should be compacted in order to ensure correct > >>> semantics. It's the same way with input topics for KTables; > >>> we rely on users to ensure the topics are compacted, and if > >>> they aren't, then the execution semantics will be broken. > >>> > >>> Thanks, > >>> -John > >>> > >>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote: > >>>> Hi John, > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Thanks for your inputs. Since, global topics are in a way their own > >>>> changelog, wouldn’t the global consumers be more akin to restore > >>>> consumers than the main consumer? > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> I am also +1 on catching the exception and setting it to the earliest > >>>> for now. Whenever an instance starts, currently global stream thread(if > >>>> available) goes to RUNNING before stream threads are started so that > >>>> means the global state is available when the processing by stream > >>>> threads start. So, with the new change of catching the exception, > >>>> cleaning store and resetting to earlier would probably be “stop the > >>>> world” as you said John, as I think we will have to pause the stream > >>>> threads till the whole global state is recovered. I assume it is "stop > >>>> the world" right now as well, since now also if an > >>>> InvalidOffsetException comes, we throw streams exception and the user > >>>> has to clean up and handle all this manually and when that instance will > >>>> start, it will restore global state first. > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> I had an additional thought to this whole problem, would it be helpful > >>>> to educate the users that global topics should have cleanup policy as > >>>> compact, so that this invalid offset exception never arises for them. > >>>> Assume for example, that the cleanup policy in global topic is "delete" > >>>> and it has deleted k1, k2 keys(via retention.ms) although all the > >>>> instances had already consumed them so they are in all global stores and > >>>> all other instances are up to date on the global data(so no > >>>> InvalidOffsetException). Now, a new instance is added to the cluster, > >>>> and we have already lost k1, k2 from the global topic so it will start > >>>> consuming from the earliest point in the global topic. So, wouldn’t this > >>>> global store on the new instance has 2 keys less than all the other > >>>> global stores already available in the cluster? Please let me know if I > >>>> am missing something. Thanks. > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Regards, > >>>> > >>>> Navinder > >>>> > >>>> > >>>> On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler > >>>><vvcep...@apache.org> wrote: > >>>> > >>>> Hi all, > >>>> > >>>> It seems like the main motivation for this proposal is satisfied if we > >>>> just implement some recovery mechanism instead of crashing. If the > >>>> mechanism is going to be pausing all the threads until the state is > >>>> recovered, then it still seems like a big enough behavior change to > >>>> warrant a KIP still. > >>>> > >>>> I have to confess I’m a little unclear on why a custom reset policy for > >>>> a global store, table, or even consumer might be considered wrong. It’s > >>>> clearly wrong for the restore consumer, but the global consumer seems > >>>> more semantically akin to the main consumer than the restore consumer. > >>>> > >>>> In other words, if it’s wrong to reset a GlobalKTable from latest, > >>>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It > >>>> certainly seems like it would be an odd choice, but I’ve seen many > >>>> choices I thought were odd turn out to have perfectly reasonable use > >>>> cases. > >>>> > >>>> As far as the PAPI global store goes, I could see adding the option to > >>>> configure it, since as Matthias pointed out, there’s really no specific > >>>> semantics for the PAPI. But if automatic recovery is really all Navinder > >>>> wanted, the I could also see deferring this until someone specifically > >>>> wants it. > >>>> > >>>> So the tl;dr is, if we just want to catch the exception and rebuild the > >>>> store by seeking to earliest with no config or API changes, then I’m +1. > >>>> > >>>> I’m wondering if we can improve on the “stop the world” effect of > >>>> rebuilding the global store, though. It seems like we could put our > >>>> heads together and come up with a more fine-grained approach to > >>>> maintaining the right semantics during recovery while still making some > >>>> progress. > >>>> > >>>> Thanks, > >>>> John > >>>> > >>>> > >>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote: > >>>>> Hi Matthias, > >>>>> > >>>>> IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is > >>>>> not as straightforward > >>>>> as it seems and it might change the existing behavior for users without > >>>>> they releasing it, I also > >>>>> > >>>>> think that we should change the behavior inside global stream thread to > >>>>> not die on > >>>>> > >>>>> InvalidOffsetException and instead clean and rebuild the state from the > >>>>> earliest. On this, as you > >>>>> > >>>>> mentioned that we would need to pause the stream threads till the > >>>>> global store is completely restored. > >>>>> > >>>>> Without it, there will be incorrect processing results if they are > >>>>> utilizing a global store during processing. > >>>>> > >>>>> > >>>>> > >>>>> So, basically we can divide the use-cases into 4 parts. > >>>>> > >>>>> - PAPI based global stores (will have the earliest hardcoded) > >>>>> - PAPI based state stores (already has auto.reset.config) > >>>>> - DSL based GlobalKTables (will have earliest hardcoded) > >>>>> - DSL based KTables (will continue with auto.reset.config) > >>>>> > >>>>> > >>>>> > >>>>> So, this would mean that we are not changing any existing behaviors > >>>>> with this if I am right. > >>>>> > >>>>> > >>>>> > >>>>> I guess we could improve the code to actually log a warning for this > >>>>> > >>>>> case, similar to what we do for some configs already (cf > >>>>> > >>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). > >>>>> > >>>>>>> I like this idea. In case we go ahead with the above approach and if > >>>>>>> we can’t > >>>>> > >>>>> deprecate it, we should educate users that this config doesn’t work. > >>>>> > >>>>> > >>>>> > >>>>> Looking forward to hearing thoughts from others as well. > >>>>> > >>>>> > >>>>> - Navinder On Tuesday, 4 August, 2020, 05:07:59 am IST, Matthias J. > >>>>> Sax <mj...@apache.org> wrote: > >>>>> > >>>>> Navinder, > >>>>> > >>>>> thanks for updating the KIP. I think the motivation section is not > >>>>> totally accurate (what is not your fault though, as the history of how > >>>>> we handle this case is intertwined...) For example, "auto.offset.reset" > >>>>> is hard-coded for the global consumer to "none" and using > >>>>> "global.consumer.auto.offset.reset" has no effect (cf > >>>>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values) > >>>>> > >>>>> Also, we could not even really deprecate the config as mentioned in > >>>>> rejected alternatives sections, because we need `auto.offset.reset` for > >>>>> the main consumer -- and adding a prefix is independent of it. Also, > >>>>> because we ignore the config, it's is also deprecated/removed if you > >>>>> wish. > >>>>> > >>>>> I guess we could improve the code to actually log a warning for this > >>>>> case, similar to what we do for some configs already (cf > >>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). > >>>>> > >>>>> > >>>>> The other question is about compatibility with regard to default > >>>>> behavior: if we want to reintroduce `global.consumer.auto.offset.reset` > >>>>> this basically implies that we need to respect `auto.offset.reset`, too. > >>>>> Remember, that any config without prefix is applied to all clients that > >>>>> support this config. Thus, if a user does not limit the scope of the > >>>>> config to the main consumer (via `main.consumer.auto.offset.reset`) but > >>>>> uses the non-prefix versions and sets it to "latest" (and relies on the > >>>>> current behavior that `auto.offset.reset` is "none", and effectively > >>>>> "earliest" on the global consumer), the user might end up with a > >>>>> surprise as the global consumer behavior would switch from "earliest" to > >>>>> "latest" (most likely unintentionally). Bottom line is, that users might > >>>>> need to change configs to preserve the old behavior... > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> However, before we discuss those details, I think we should discuss the > >>>>> topic in a broader context first: > >>>>> > >>>>> - for a GlobalKTable, does it even make sense from a correctness point > >>>>> of view, to allow users to set a custom reset policy? It seems you > >>>>> currently don't propose this in the KIP, but as you don't mention it > >>>>> explicitly it's unclear if that on purpose of an oversight? > >>>>> > >>>>> - Should we treat global stores differently to GlobalKTables and allow > >>>>> for more flexibility (as the PAPI does not really provide any semantic > >>>>> contract). It seems that is what you propose in the KIP. We should > >>>>> discuss if this flexibility does make sense or not for the PAPI, or if > >>>>> we should apply the same reasoning about correctness we use for KTables > >>>>> to global stores? To what extend are/should they be different? > >>>>> > >>>>> - If we support auto.offset.reset for global store, how should we > >>>>> handle the initial bootstrapping of the store/table (that is hard-coded > >>>>> atm)? Should we skip it if the policy is "latest" and start with an > >>>>> empty state? Note that we did consider this behavior incorrect via > >>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am wondering > >>>>> why should we change it back again? > >>>>> > >>>>> > >>>>> Finally, the main motivation for the Jira ticket was to let the runtime > >>>>> auto-recover instead of dying as it does currently. If we decide that a > >>>>> custom reset policy does actually not make sense, we can just change the > >>>>> global-thread to not die any longer on an `InvalidOffsetException` but > >>>>> rebuild the state automatically. This would be "only" a behavior change > >>>>> but does not require any public API changes. -- For this case, we should > >>>>> also think about the synchronization with the main processing threads? > >>>>> On startup we bootstrap the global stores before processing happens. > >>>>> Thus, if an `InvalidOffsetException` happen and the global thread dies, > >>>>> the main threads cannot access the global stores any longer an also die. > >>>>> If we re-build the state though, do we need to pause the main thread > >>>>> during this phase? > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> > >>>>> On 8/2/20 8:48 AM, Navinder Brar wrote: > >>>>>> Hi John, > >>>>>> > >>>>>> I have updated the KIP to make the motivation more clear. In a > >>>>>> nutshell, we will use the already existing config > >>>>>> "global.consumer.auto.offset.reset" for users to set a blanket reset > >>>>>> policy for all global topics and add a new interface to set per-topic > >>>>>> reset policy for each global topic(for which we specifically need this > >>>>>> KIP). There was a point raised from Matthias above to always reset to > >>>>>> earliest by cleaning the stores and seekToBeginning in case of > >>>>>> InvalidOffsetException. We can go with that route as well and I don't > >>>>>> think it would need a KIP as if we are not providing users an option > >>>>>> to have blanket reset policy on global topics, then a per-topic > >>>>>> override would also not be required(the KIP is required basically for > >>>>>> that). Although, I think if users have an option to choose reset > >>>>>> policy for StreamThread then the option should be provided for > >>>>>> GlobalStreamThread as well and if we don't want to use the > >>>>>> "global.consumer.auto.offset.reset" then we would need to deprecate it > >>>>>> because currently it's not serving any purpose. For now, I have added > >>>>>> it in rejected alternatives but we can discuss this. > >>>>>> > >>>>>> On the query that I had for Guozhang, thanks to Matthias we have fixed > >>>>>> it last week as part of KAFKA-10306. > >>>>>> > >>>>>> ~Navinder > >>>>>> > >>>>>> > >>>>>> On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar > >>>>>><navinder_b...@yahoo.com.invalid> wrote: > >>>>>> > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> > >>>>>> > >>>>>> Sorry, it took some time to respond back. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> “but I thought we would pass the config through to the client.” > >>>>>> > >>>>>>>> @John, sure we can use the config in GloablStreamThread, that could > >>>>>>>> be one of the way to solve it. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> @Matthias, sure cleaning the store and recreating is one way but since > >>>>>> we are giving an option to reset in StreamThread why the > >>>>>> implementation should be different in GlobalStreamThread. I think we > >>>>>> should use the global.consumer.auto.offset.reset config to accept the > >>>>>> reset strategy opted by the user although I would be ok with just > >>>>>> cleaning and resetting to the latest as well for now. Currently, we > >>>>>> throw a StreamsException in case of InvalidOffsetException in > >>>>>> GlobalStreamThread so just resetting would still be better than what > >>>>>> happens currently. > >>>>>> > >>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* > >>>>>> Note that {@link GlobalKTable} always applies {@code > >>>>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the > >>>>>> specified value in {@link StreamsConfig} or {@link Consumed}.’ > >>>>>> So, I guess we are already cleaning up and recreating for GlobalKTable > >>>>>> from earliest offset. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> @Guozhan while looking at the code, I also noticed a TODO: pending in > >>>>>> GlobalStateManagerImpl, when InvalidOffsetException is thrown. > >>>>>> Earlier, we were directly clearing the store here and recreating from > >>>>>> scratch but that code piece is removed now. Are you working on a > >>>>>> follow-up PR for this or just handling the reset in GlobalStreamThread > >>>>>> should be sufficient? > >>>>>> > >>>>>> Regards, > >>>>>> Navinder > >>>>>> > >>>>>> On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax > >>>>>><mj...@apache.org> wrote: > >>>>>> > >>>>>> Atm, the config should be ignored and the global-consumer should use > >>>>>> "none" in a hard-coded way. > >>>>>> > >>>>>> However, if am still wondering if we actually want/need to allow users > >>>>>> to specify the reset policy? It might be worth to consider, to just > >>>>>> change the behavior: catch the exception, log an ERROR (for information > >>>>>> purpose), wipe the store, seekToBeginning(), and recreate the store? > >>>>>> > >>>>>> Btw: if we want to allow users to set the reset policy, this should be > >>>>>> possible via the config, or via overwriting the config in the method > >>>>>> itself. Thus, we would need to add the new overloaded method to > >>>>>> `Topology` and `StreamsBuilder`. > >>>>>> > >>>>>> Another question to ask: what about GlobalKTables? Should they behave > >>>>>> the same? An alternative design could be, to allow users to specify a > >>>>>> flexible reset policy for global-stores, but not for GlobalKTables and > >>>>>> use the strategy suggested above for this case. > >>>>>> > >>>>>> Thoughts? > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 7/2/20 2:14 PM, John Roesler wrote: > >>>>>>> Hi Navinder, > >>>>>>> > >>>>>>> Thanks for the response. I’m sorry if I’m being dense... You said we > >>>>>>> are not currently using the config, but I thought we would pass the > >>>>>>> config through to the client. Can you confirm whether or not the > >>>>>>> existing config works for your use case? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> John > >>>>>>> > >>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote: > >>>>>>>> Sorry my bad. Found it. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs for > >>>>>>>> the > >>>>>>>> global consumer client from > >>>>>>>> > >>>>>>>> * the general consumer client configs. The override precedence is > >>>>>>>> the > >>>>>>>> following (from highest to lowest precedence): > >>>>>>>> * 1. global.consumer.[config-name].. > >>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = > >>>>>>>> "global.consumer."; > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> So, that's great. We already have a config exposed to reset offsets > >>>>>>>> for > >>>>>>>> global topics via global.consumer.auto.offset.reset just that we are > >>>>>>>> not actually using it inside GlobalStreamThread to reset. > >>>>>>>> > >>>>>>>> -Navinder > >>>>>>>> On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar > >>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>> > >>>>>>>> Hi John, > >>>>>>>> > >>>>>>>> Thanks for your feedback. > >>>>>>>> 1. I think there is some confusion on my first point, the enum I am > >>>>>>>> sure we can use the same one but the external config which controls > >>>>>>>> the > >>>>>>>> resetting in global stream thread either we can the same one which > >>>>>>>> users use for source topics(StreamThread) or we can provide a new > >>>>>>>> one > >>>>>>>> which specifically controls global topics. For e.g. currently if I > >>>>>>>> get > >>>>>>>> an InvalidOffsetException in any of my source topics, I can choose > >>>>>>>> whether to reset from Earliest or Latest(with auto.offset.reset). > >>>>>>>> Now > >>>>>>>> either we can use the same option and say if I get the same > >>>>>>>> exception > >>>>>>>> for global topics I will follow same resetting. Or some users might > >>>>>>>> want to have totally different setting for both source and global > >>>>>>>> topics, like for source topic I want resetting from Latest but for > >>>>>>>> global topics I want resetting from Earliest so in that case adding > >>>>>>>> a > >>>>>>>> new config might be better. > >>>>>>>> > >>>>>>>> 2. I couldn't find this config currently > >>>>>>>> "global.consumer.auto.offset.reset". Infact in > >>>>>>>> GlobalStreamThread.java > >>>>>>>> we are throwing a StreamsException for InvalidOffsetException and > >>>>>>>> there > >>>>>>>> is a test as > >>>>>>>> well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so > >>>>>>>> I > >>>>>>>> think this is the config we are trying to introduce with this KIP. > >>>>>>>> > >>>>>>>> -Navinder On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler > >>>>>>>> <j...@vvcephei.org> wrote: > >>>>>>>> > >>>>>>>> Hi Navinder, > >>>>>>>> > >>>>>>>> Thanks for this proposal! > >>>>>>>> > >>>>>>>> Regarding your question about whether to use the same policy > >>>>>>>> enum or not, the underlying mechanism is the same, so I think > >>>>>>>> we can just use the same AutoOffsetReset enum. > >>>>>>>> > >>>>>>>> Can you confirm whether setting the reset policy config on the > >>>>>>>> global consumer currently works or not? Based on my reading > >>>>>>>> of StreamsConfig, it looks like it would be: > >>>>>>>> "global.consumer.auto.offset.reset". > >>>>>>>> > >>>>>>>> If that does work, would you still propose to augment the > >>>>>>>> Java API? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> -John > >>>>>>>> > >>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote: > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> KIP: > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy > >>>>>>>>> > >>>>>>>>> I have taken over this KIP since it has been dormant for a long > >>>>>>>>> time > >>>>>>>>> and this looks important for use-cases that have large global data, > >>>>>>>>> so > >>>>>>>>> rebuilding global stores from scratch might seem overkill in case > >>>>>>>>> of > >>>>>>>>> InvalidOffsetExecption. > >>>>>>>>> > >>>>>>>>> We want to give users the control to use reset policy(as we do in > >>>>>>>>> StreamThread) in case they hit invalid offsets. I have still not > >>>>>>>>> decided whether to restrict this option to the same reset policy > >>>>>>>>> being > >>>>>>>>> used by StreamThread(using auto.offset.reset config) or add another > >>>>>>>>> reset config specifically for global stores > >>>>>>>>> "global.auto.offset.reset" which gives users more control to choose > >>>>>>>>> separate policies for global and stream threads. > >>>>>>>>> > >>>>>>>>> I would like to hear your opinions on the KIP. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Navinder > >>>>>> > >>>>>> > >>>>> > > >