I think this issue can actually be resolved. - We need a flag on the stream-threads if global-restore is in progress; for this case, the stream-thread may go into RUNNING state, but it's not allowed to actually process data -- it will be allowed to update standby-task though.
- If a stream-thread restores, its own state is RESTORING and it does not need to care about the "global restore flag". - The global-thread just does was we discussed, including using state RESTORING. - The KafkaStreams client state is in RESTORING, if at least one thread (stream-thread or global-thread) is in state RESTORING. - On startup, if there is a global-thread, the just set the global-restore flag upfront before we start the stream-threads (we can actually still do the rebalance and potential restore in stream-thread in parallel to global restore) and rely on the global-thread to unset the flag. - The tricky thing is, to "stop" processing in stream-threads if we need to wipe the global-store and rebuilt it. For this, we should set the "global restore flag" on the stream-threads, but we also need to "lock down" the global store in question and throw an exception if the stream-thread tries to access it; if the stream-thread get this exception, it need to cleanup itself, and wait until the "global restore flag" is unset before it can continue. Do we think this would work? -- Of course, the devil is in the details but it seems to become a PR discussion, and there is no reason to make it part of the KIP. -Matthias On 9/3/20 3:41 AM, Navinder Brar wrote: > Hi, > > Thanks, John, Matthias and Sophie for great feedback. > > On the point raised by Sophie that maybe we should allow normal restoring > during GLOBAL_RESTORING, I think it makes sense but the challenge would be > what happens when normal restoring(on actives) has finished but > GLOBAL_RESTORINGis still going on. Currently, all restorings are independent > of each other i.e. restoring happening on one task/thread doesn't affect > another. But if we do go ahead with allowing normal restoring during > GLOBAL_RESTORING then we willstill have to pause the active tasks from going > to RUNNING if GLOBAL_RESTORING has not finished and normal restorings have > finished. For the same reason, I wouldn't want to combine global restoring > and normal restoring because then it would make all the restorings > independent but we don't want that. We want global stores to be available > before any processing starts on the active tasks. > > Although I think restoring of replicas can still take place while global > stores arerestoring because in replicas there is no danger of them starting > processing. > > Also, one point to bring up is that currently during application startup > global stores restore first and then normal stream threads start. > > Regards,Navinder > > On Thursday, 3 September, 2020, 06:58:40 am IST, Matthias J. Sax > <mj...@apache.org> wrote: > > Thanks for the input Sophie. Those are all good points and I fully agree > with them. > > When saying "pausing the processing threads" I only considered them in > `RUNNING` and thought we figure out the detail on the PR... Excellent catch! > > Changing state transitions is to some extend backward incompatible, but > I think (IIRC) we did it in the past and I personally tend to find it > ok. That's why we cover those changes in a KIP. > > -Matthias > > On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote: >> If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM, >> maybe it would make sense to add a new plain RESTORING state that we >> transition >> to when restoring non-global state stores following a rebalance. Right now >> all restoration >> occurs within the REBALANCING state, which is pretty misleading. >> Applications that >> have large amounts of state to restore will appear to be stuck rebalancing >> according to >> the state listener, when in fact the rebalance has completed long ago. >> Given that there >> are very much real scenarios where you actually *are *stuck rebalancing, it >> seems useful to >> distinguish plain restoration from more insidious cases that may require >> investigation and/or >> intervention. >> >> I don't mean to hijack this KIP, I just think it would be odd to introduce >> GLOBAL_RESTORING >> when there is no other kind of RESTORING state. One question this brings >> up, and I >> apologize if this has already been addressed, is what to do when we are >> restoring >> both normal and global state stores? It sounds like we plan to pause the >> StreamThreads >> entirely, but there doesn't seem to be any reason not to allow regular >> state restoration -- or >> even standby processing -- while the global state is restoring.Given the >> current effort to move >> restoration & standbys to a separate thread, allowing them to continue >> while pausing >> only the StreamThread seems quite natural. >> >> Assuming that we actually do allow both types of restoration to occur at >> the same time, >> and if we did add a plain RESTORING state as well, which state should we >> end up in? >> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING state is to >> alert >> users of the non-progress of their active tasks. In both cases, the active >> task is unable >> to continue until restoration has complete, so why distinguish between the >> two at all? >> Would it make sense to avoid a special GLOBAL_RESTORING state and just >> introduce >> a single unified RESTORING state to cover both the regular and global case? >> Just a thought >> >> My only concern is that this might be considered a breaking change: users >> might be >> looking for the REBALANCING -> RUNNING transition specifically in order to >> alert when >> the application has started up, and we would no long go directly from >> REBALANCING to >> RUNNING. I think we actually did/do this ourselves in a number of >> integration tests and >> possibly in some examples. That said, it seems more appropriate to just >> listen for >> the RUNNING state rather than for a specific transition, and we should >> encourage users >> to do so rather than go out of our way to support transition-type state >> listeners. >> >> Cheers, >> Sophie >> >> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org> wrote: >> >>> I think this makes sense. >>> >>> When we introduce this new state, we might also tackle the jira a >>> mentioned. If there is a global thread, on startup of a `KafakStreams` >>> client we should not transit to `REBALANCING` but to the new state, and >>> maybe also make the "bootstrapping" non-blocking. >>> >>> I guess it's worth to mention this in the KIP. >>> >>> Btw: The new state for KafkaStreams should also be part of the KIP as it >>> is a public API change, too. >>> >>> >>> -Matthias >>> >>> On 8/29/20 9:37 AM, John Roesler wrote: >>>> Hi Navinder, >>>> >>>> Thanks for the ping. Yes, that all sounds right to me. The name >>> “RESTORING_GLOBAL” sounds fine, too. >>>> >>>> I think as far as warnings go, we’d just propose to mention it in the >>> javadoc of the relevant methods that the given topics should be compacted. >>>> >>>> Thanks! >>>> -John >>>> >>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote: >>>>> Gentle ping. >>>>> >>>>> ~ Navinder >>>>> On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar >>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>> >>>>> >>>>> Thanks Matthias & John, >>>>> >>>>> >>>>> >>>>> I am glad we are converging towards an understanding. So, to summarize, >>>>> >>>>> we will still keep treating this change in KIP and instead of providing >>> a reset >>>>> >>>>> strategy, we will cleanup, and reset to earliest and build the state. >>>>> >>>>> When we hit the exception and we are building the state, we will stop >>> all >>>>> >>>>> processing and change the state of KafkaStreams to something like >>>>> >>>>> “RESTORING_GLOBAL” or the like. >>>>> >>>>> >>>>> >>>>> How do we plan to educate users on the non desired effects of using >>>>> >>>>> non-compacted global topics? (via the KIP itself?) >>>>> >>>>> >>>>> +1 on changing the KTable behavior, reset policy for global, connecting >>>>> processors to global for a later stage when demanded. >>>>> >>>>> Regards, >>>>> Navinder >>>>> On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax >>>>> <mj...@apache.org> wrote: >>>>> >>>>> Your observation is correct. Connecting (regular) stores to processors >>>>> is necessary to "merge" sub-topologies into single ones if a store is >>>>> shared. -- For global stores, the structure of the program does not >>>>> change and thus connecting srocessors to global stores is not required. >>>>> >>>>> Also given our experience with restoring regular state stores (ie, >>>>> partial processing of task that don't need restore), it seems better to >>>>> pause processing and move all CPU and network resources to the global >>>>> thread to rebuild the global store as soon as possible instead of >>>>> potentially slowing down the restore in order to make progress on some >>>>> tasks. >>>>> >>>>> Of course, if we collect real world experience and it becomes an issue, >>>>> we could still try to change it? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 8/18/20 3:31 PM, John Roesler wrote: >>>>>> Thanks Matthias, >>>>>> >>>>>> Sounds good. I'm on board with no public API change and just >>>>>> recovering instead of crashing. >>>>>> >>>>>> Also, to be clear, I wouldn't drag KTables into it; I was >>>>>> just trying to wrap my head around the congruity of our >>>>>> choice for GlobalKTable with respect to KTable. >>>>>> >>>>>> I agree that whatever we decide to do would probably also >>>>>> resolve KAFKA-7380. >>>>>> >>>>>> Moving on to discuss the behavior change, I'm wondering if >>>>>> we really need to block all the StreamThreads. It seems like >>>>>> we only need to prevent processing on any task that's >>>>>> connected to the GlobalStore. >>>>>> >>>>>> I just took a look at the topology building code, and it >>>>>> actually seems that connections to global stores don't need >>>>>> to be declared. That's a bummer, since it means that we >>>>>> really do have to stop all processing while the global >>>>>> thread catches up. >>>>>> >>>>>> Changing this seems like it'd be out of scope right now, but >>>>>> I bring it up in case I'm wrong and it actually is possible >>>>>> to know which specific tasks need to be synchronized with >>>>>> which global state stores. If we could know that, then we'd >>>>>> only have to block some of the tasks, not all of the >>>>>> threads. >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> >>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote: >>>>>>> Thanks for the discussion. >>>>>>> >>>>>>> I agree that this KIP is justified in any case -- even if we don't >>>>>>> change public API, as the change in behavior is significant. >>>>>>> >>>>>>> A better documentation for cleanup policy is always good (even if I am >>>>>>> not aware of any concrete complaints atm that users were not aware of >>>>>>> the implications). Of course, for a regular KTable, one can >>>>>>> enable/disable the source-topic-changelog optimization and thus can >>> use >>>>>>> a non-compacted topic for this case, what is quite a difference to >>>>>>> global stores/tables; so maybe it's worth to point out this difference >>>>>>> explicitly. >>>>>>> >>>>>>> As mentioned before, the main purpose of the original Jira was to >>> avoid >>>>>>> the crash situation but to allow for auto-recovering while it was an >>>>>>> open question if it makes sense / would be useful to allow users to >>>>>>> specify a custom reset policy instead of using a hard-coded "earliest" >>>>>>> strategy. -- It seem it's still unclear if it would be useful and thus >>>>>>> it might be best to not add it for now -- we can still add it later if >>>>>>> there are concrete use-cases that need this feature. >>>>>>> >>>>>>> @John: I actually agree that it's also questionable to allow a custom >>>>>>> reset policy for KTables... Not sure if we want to drag this question >>>>>>> into this KIP though? >>>>>>> >>>>>>> So it seem, we all agree that we actually don't need any public API >>>>>>> changes, but we only want to avoid crashing? >>>>>>> >>>>>>> For this case, to preserve the current behavior that guarantees that >>> the >>>>>>> global store/table is always loaded first, it seems we need to have a >>>>>>> stop-the-world mechanism for the main `StreamThreads` for this case -- >>>>>>> do we need to add a new state to KafkaStreams client for this case? >>>>>>> >>>>>>> Having a new state might also be helpful for >>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 8/17/20 7:34 AM, John Roesler wrote: >>>>>>>> Hi Navinder, >>>>>>>> >>>>>>>> I see what you mean about the global consumer being similar >>>>>>>> to the restore consumer. >>>>>>>> >>>>>>>> I also agree that automatically performing the recovery >>>>>>>> steps should be strictly an improvement over the current >>>>>>>> situation. >>>>>>>> >>>>>>>> Also, yes, it would be a good idea to make it clear that the >>>>>>>> global topic should be compacted in order to ensure correct >>>>>>>> semantics. It's the same way with input topics for KTables; >>>>>>>> we rely on users to ensure the topics are compacted, and if >>>>>>>> they aren't, then the execution semantics will be broken. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote: >>>>>>>>> Hi John, >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks for your inputs. Since, global topics are in a way their own >>> changelog, wouldn’t the global consumers be more akin to restore consumers >>> than the main consumer? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I am also +1 on catching the exception and setting it to the >>> earliest for now. Whenever an instance starts, currently global stream >>> thread(if available) goes to RUNNING before stream threads are started so >>> that means the global state is available when the processing by stream >>> threads start. So, with the new change of catching the exception, cleaning >>> store and resetting to earlier would probably be “stop the world” as you >>> said John, as I think we will have to pause the stream threads till the >>> whole global state is recovered. I assume it is "stop the world" right now >>> as well, since now also if an InvalidOffsetException comes, we throw >>> streams exception and the user has to clean up and handle all this manually >>> and when that instance will start, it will restore global state first. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I had an additional thought to this whole problem, would it be >>> helpful to educate the users that global topics should have cleanup policy >>> as compact, so that this invalid offset exception never arises for them. >>> Assume for example, that the cleanup policy in global topic is "delete" and >>> it has deleted k1, k2 keys(via retention.ms) although all the instances >>> had already consumed them so they are in all global stores and all other >>> instances are up to date on the global data(so no InvalidOffsetException). >>> Now, a new instance is added to the cluster, and we have already lost k1, >>> k2 from the global topic so it will start consuming from the earliest point >>> in the global topic. So, wouldn’t this global store on the new instance has >>> 2 keys less than all the other global stores already available in the >>> cluster? Please let me know if I am missing something. Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>> Navinder >>>>>>>>> >>>>>>>>> >>>>>>>>> On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler < >>> vvcep...@apache.org> wrote: >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> It seems like the main motivation for this proposal is satisfied if >>> we just implement some recovery mechanism instead of crashing. If the >>> mechanism is going to be pausing all the threads until the state is >>> recovered, then it still seems like a big enough behavior change to warrant >>> a KIP still. >>>>>>>>> >>>>>>>>> I have to confess I’m a little unclear on why a custom reset policy >>> for a global store, table, or even consumer might be considered wrong. It’s >>> clearly wrong for the restore consumer, but the global consumer seems more >>> semantically akin to the main consumer than the restore consumer. >>>>>>>>> >>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from latest, >>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It >>> certainly seems like it would be an odd choice, but I’ve seen many choices >>> I thought were odd turn out to have perfectly reasonable use cases. >>>>>>>>> >>>>>>>>> As far as the PAPI global store goes, I could see adding the option >>> to configure it, since as Matthias pointed out, there’s really no specific >>> semantics for the PAPI. But if automatic recovery is really all Navinder >>> wanted, the I could also see deferring this until someone specifically >>> wants it. >>>>>>>>> >>>>>>>>> So the tl;dr is, if we just want to catch the exception and rebuild >>> the store by seeking to earliest with no config or API changes, then I’m +1. >>>>>>>>> >>>>>>>>> I’m wondering if we can improve on the “stop the world” effect of >>> rebuilding the global store, though. It seems like we could put our heads >>> together and come up with a more fine-grained approach to maintaining the >>> right semantics during recovery while still making some progress. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> John >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote: >>>>>>>>>> Hi Matthias, >>>>>>>>>> >>>>>>>>>> IMHO, now as you explained using >>> ‘global.consumer.auto.offset.reset’ is >>>>>>>>>> not as straightforward >>>>>>>>>> as it seems and it might change the existing behavior for users >>> without >>>>>>>>>> they releasing it, I also >>>>>>>>>> >>>>>>>>>> think that we should change the behavior inside global stream >>> thread to >>>>>>>>>> not die on >>>>>>>>>> >>>>>>>>>> InvalidOffsetException and instead clean and rebuild the state >>> from the >>>>>>>>>> earliest. On this, as you >>>>>>>>>> >>>>>>>>>> mentioned that we would need to pause the stream threads till the >>>>>>>>>> global store is completely restored. >>>>>>>>>> >>>>>>>>>> Without it, there will be incorrect processing results if they are >>>>>>>>>> utilizing a global store during processing. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> So, basically we can divide the use-cases into 4 parts. >>>>>>>>>> >>>>>>>>>> - PAPI based global stores (will have the earliest hardcoded) >>>>>>>>>> - PAPI based state stores (already has auto.reset.config) >>>>>>>>>> - DSL based GlobalKTables (will have earliest hardcoded) >>>>>>>>>> - DSL based KTables (will continue with auto.reset.config) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> So, this would mean that we are not changing any existing >>> behaviors >>>>>>>>>> with this if I am right. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I guess we could improve the code to actually log a warning for >>> this >>>>>>>>>> >>>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>>> >>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>>> >>>>>>>>>>>> I like this idea. In case we go ahead with the above approach >>> and if we can’t >>>>>>>>>> >>>>>>>>>> deprecate it, we should educate users that this config doesn’t >>> work. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Looking forward to hearing thoughts from others as well. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> - Navinder On Tuesday, 4 August, 2020, 05:07:59 am IST, >>> Matthias J. >>>>>>>>>> Sax <mj...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>> Navinder, >>>>>>>>>> >>>>>>>>>> thanks for updating the KIP. I think the motivation section is not >>>>>>>>>> totally accurate (what is not your fault though, as the history of >>> how >>>>>>>>>> we handle this case is intertwined...) For example, >>> "auto.offset.reset" >>>>>>>>>> is hard-coded for the global consumer to "none" and using >>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf >>>>>>>>>> >>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values >>> ) >>>>>>>>>> >>>>>>>>>> Also, we could not even really deprecate the config as mentioned in >>>>>>>>>> rejected alternatives sections, because we need >>> `auto.offset.reset` for >>>>>>>>>> the main consumer -- and adding a prefix is independent of it. >>> Also, >>>>>>>>>> because we ignore the config, it's is also deprecated/removed if >>> you wish. >>>>>>>>>> >>>>>>>>>> I guess we could improve the code to actually log a warning for >>> this >>>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The other question is about compatibility with regard to default >>>>>>>>>> behavior: if we want to reintroduce >>> `global.consumer.auto.offset.reset` >>>>>>>>>> this basically implies that we need to respect >>> `auto.offset.reset`, too. >>>>>>>>>> Remember, that any config without prefix is applied to all clients >>> that >>>>>>>>>> support this config. Thus, if a user does not limit the scope of >>> the >>>>>>>>>> config to the main consumer (via >>> `main.consumer.auto.offset.reset`) but >>>>>>>>>> uses the non-prefix versions and sets it to "latest" (and relies >>> on the >>>>>>>>>> current behavior that `auto.offset.reset` is "none", and >>> effectively >>>>>>>>>> "earliest" on the global consumer), the user might end up with a >>>>>>>>>> surprise as the global consumer behavior would switch from >>> "earliest" to >>>>>>>>>> "latest" (most likely unintentionally). Bottom line is, that users >>> might >>>>>>>>>> need to change configs to preserve the old behavior... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> However, before we discuss those details, I think we should >>> discuss the >>>>>>>>>> topic in a broader context first: >>>>>>>>>> >>>>>>>>>> - for a GlobalKTable, does it even make sense from a correctness >>> point >>>>>>>>>> of view, to allow users to set a custom reset policy? It seems you >>>>>>>>>> currently don't propose this in the KIP, but as you don't mention >>> it >>>>>>>>>> explicitly it's unclear if that on purpose of an oversight? >>>>>>>>>> >>>>>>>>>> - Should we treat global stores differently to GlobalKTables and >>> allow >>>>>>>>>> for more flexibility (as the PAPI does not really provide any >>> semantic >>>>>>>>>> contract). It seems that is what you propose in the KIP. We should >>>>>>>>>> discuss if this flexibility does make sense or not for the PAPI, >>> or if >>>>>>>>>> we should apply the same reasoning about correctness we use for >>> KTables >>>>>>>>>> to global stores? To what extend are/should they be different? >>>>>>>>>> >>>>>>>>>> - If we support auto.offset.reset for global store, how should we >>>>>>>>>> handle the initial bootstrapping of the store/table (that is >>> hard-coded >>>>>>>>>> atm)? Should we skip it if the policy is "latest" and start with an >>>>>>>>>> empty state? Note that we did consider this behavior incorrect via >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am >>> wondering >>>>>>>>>> why should we change it back again? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Finally, the main motivation for the Jira ticket was to let the >>> runtime >>>>>>>>>> auto-recover instead of dying as it does currently. If we decide >>> that a >>>>>>>>>> custom reset policy does actually not make sense, we can just >>> change the >>>>>>>>>> global-thread to not die any longer on an `InvalidOffsetException` >>> but >>>>>>>>>> rebuild the state automatically. This would be "only" a behavior >>> change >>>>>>>>>> but does not require any public API changes. -- For this case, we >>> should >>>>>>>>>> also think about the synchronization with the main processing >>> threads? >>>>>>>>>> On startup we bootstrap the global stores before processing >>> happens. >>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global thread >>> dies, >>>>>>>>>> the main threads cannot access the global stores any longer an >>> also die. >>>>>>>>>> If we re-build the state though, do we need to pause the main >>> thread >>>>>>>>>> during this phase? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote: >>>>>>>>>>> Hi John, >>>>>>>>>>> >>>>>>>>>>> I have updated the KIP to make the motivation more clear. In a >>> nutshell, we will use the already existing config >>> "global.consumer.auto.offset.reset" for users to set a blanket reset policy >>> for all global topics and add a new interface to set per-topic reset policy >>> for each global topic(for which we specifically need this KIP). There was a >>> point raised from Matthias above to always reset to earliest by cleaning >>> the stores and seekToBeginning in case of InvalidOffsetException. We can go >>> with that route as well and I don't think it would need a KIP as if we are >>> not providing users an option to have blanket reset policy on global >>> topics, then a per-topic override would also not be required(the KIP is >>> required basically for that). Although, I think if users have an option to >>> choose reset policy for StreamThread then the option should be provided for >>> GlobalStreamThread as well and if we don't want to use the >>> "global.consumer.auto.offset.reset" then we would need to deprecate it >>> because currently it's not serving any purpose. For now, I have added it in >>> rejected alternatives but we can discuss this. >>>>>>>>>>> >>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias we have >>> fixed it last week as part of KAFKA-10306. >>>>>>>>>>> >>>>>>>>>>> ~Navinder >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar < >>> navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Sorry, it took some time to respond back. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> “but I thought we would pass the config through to the client.” >>>>>>>>>>> >>>>>>>>>>>>> @John, sure we can use the config in GloablStreamThread, that >>> could be one of the way to solve it. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> @Matthias, sure cleaning the store and recreating is one way but >>> since we are giving an option to reset in StreamThread why the >>> implementation should be different in GlobalStreamThread. I think we should >>> use the global.consumer.auto.offset.reset config to accept the reset >>> strategy opted by the user although I would be ok with just cleaning and >>> resetting to the latest as well for now. Currently, we throw a >>> StreamsException in case of InvalidOffsetException in GlobalStreamThread so >>> just resetting would still be better than what happens currently. >>>>>>>>>>> >>>>>>>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable >>> ‘* Note that {@link GlobalKTable} always applies {@code >>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the >>> specified value in {@link StreamsConfig} or {@link Consumed}.’ >>>>>>>>>>> So, I guess we are already cleaning up and recreating for >>> GlobalKTable from earliest offset. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> @Guozhan while looking at the code, I also noticed a TODO: >>> pending in GlobalStateManagerImpl, when InvalidOffsetException is thrown. >>> Earlier, we were directly clearing the store here and recreating from >>> scratch but that code piece is removed now. Are you working on a follow-up >>> PR for this or just handling the reset in GlobalStreamThread should be >>> sufficient? >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Navinder >>>>>>>>>>> >>>>>>>>>>> On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax < >>> mj...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>> Atm, the config should be ignored and the global-consumer >>> should use >>>>>>>>>>> "none" in a hard-coded way. >>>>>>>>>>> >>>>>>>>>>> However, if am still wondering if we actually want/need to allow >>> users >>>>>>>>>>> to specify the reset policy? It might be worth to consider, to >>> just >>>>>>>>>>> change the behavior: catch the exception, log an ERROR (for >>> information >>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate the >>> store? >>>>>>>>>>> >>>>>>>>>>> Btw: if we want to allow users to set the reset policy, this >>> should be >>>>>>>>>>> possible via the config, or via overwriting the config in the >>> method >>>>>>>>>>> itself. Thus, we would need to add the new overloaded method to >>>>>>>>>>> `Topology` and `StreamsBuilder`. >>>>>>>>>>> >>>>>>>>>>> Another question to ask: what about GlobalKTables? Should they >>> behave >>>>>>>>>>> the same? An alternative design could be, to allow users to >>> specify a >>>>>>>>>>> flexible reset policy for global-stores, but not for >>> GlobalKTables and >>>>>>>>>>> use the strategy suggested above for this case. >>>>>>>>>>> >>>>>>>>>>> Thoughts? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote: >>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... You >>> said we are not currently using the config, but I thought we would pass the >>> config through to the client. Can you confirm whether or not the existing >>> config works for your use case? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> John >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote: >>>>>>>>>>>>> Sorry my bad. Found it. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs >>> for the >>>>>>>>>>>>> global consumer client from >>>>>>>>>>>>> >>>>>>>>>>>>> * the general consumer client configs. The override precedence >>> is the >>>>>>>>>>>>> following (from highest to lowest precedence): >>>>>>>>>>>>> * 1. global.consumer.[config-name].. >>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = >>> "global.consumer."; >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> So, that's great. We already have a config exposed to reset >>> offsets for >>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset just that >>> we are >>>>>>>>>>>>> not actually using it inside GlobalStreamThread to reset. >>>>>>>>>>>>> >>>>>>>>>>>>> -Navinder >>>>>>>>>>>>> On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi John, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your feedback. >>>>>>>>>>>>> 1. I think there is some confusion on my first point, the enum >>> I am >>>>>>>>>>>>> sure we can use the same one but the external config which >>> controls the >>>>>>>>>>>>> resetting in global stream thread either we can the same one >>> which >>>>>>>>>>>>> users use for source topics(StreamThread) or we can provide a >>> new one >>>>>>>>>>>>> which specifically controls global topics. For e.g. currently >>> if I get >>>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I can >>> choose >>>>>>>>>>>>> whether to reset from Earliest or Latest(with >>> auto.offset.reset). Now >>>>>>>>>>>>> either we can use the same option and say if I get the same >>> exception >>>>>>>>>>>>> for global topics I will follow same resetting. Or some users >>> might >>>>>>>>>>>>> want to have totally different setting for both source and >>> global >>>>>>>>>>>>> topics, like for source topic I want resetting from Latest but >>> for >>>>>>>>>>>>> global topics I want resetting from Earliest so in that case >>> adding a >>>>>>>>>>>>> new config might be better. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. I couldn't find this config currently >>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in >>> GlobalStreamThread.java >>>>>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException >>> and there >>>>>>>>>>>>> is a test as >>>>>>>>>>>>> well >>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I >>>>>>>>>>>>> think this is the config we are trying to introduce with this >>> KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> -Navinder On Saturday, 27 June, 2020, 07:03:04 pm IST, John >>> Roesler >>>>>>>>>>>>> <j...@vvcephei.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for this proposal! >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding your question about whether to use the same policy >>>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I think >>>>>>>>>>>>> we can just use the same AutoOffsetReset enum. >>>>>>>>>>>>> >>>>>>>>>>>>> Can you confirm whether setting the reset policy config on the >>>>>>>>>>>>> global consumer currently works or not? Based on my reading >>>>>>>>>>>>> of StreamsConfig, it looks like it would be: >>>>>>>>>>>>> "global.consumer.auto.offset.reset". >>>>>>>>>>>>> >>>>>>>>>>>>> If that does work, would you still propose to augment the >>>>>>>>>>>>> Java API? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote: >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> KIP: >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have taken over this KIP since it has been dormant for a >>> long time >>>>>>>>>>>>>> and this looks important for use-cases that have large global >>> data, so >>>>>>>>>>>>>> rebuilding global stores from scratch might seem overkill in >>> case of >>>>>>>>>>>>>> InvalidOffsetExecption. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We want to give users the control to use reset policy(as we do >>> in >>>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have still >>> not >>>>>>>>>>>>>> decided whether to restrict this option to the same reset >>> policy being >>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config) or add >>> another >>>>>>>>>>>>>> reset config specifically for global stores >>>>>>>>>>>>>> "global.auto.offset.reset" which gives users more control to >>> choose >>>>>>>>>>>>>> separate policies for global and stream threads. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I would like to hear your opinions on the KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Navinder >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>> >>> >>> >> > >
signature.asc
Description: OpenPGP digital signature