I think this issue can actually be resolved.

 - We need a flag on the stream-threads if global-restore is in
progress; for this case, the stream-thread may go into RUNNING state,
but it's not allowed to actually process data -- it will be allowed to
update standby-task though.

 - If a stream-thread restores, its own state is RESTORING and it does
not need to care about the "global restore flag".

 - The global-thread just does was we discussed, including using state
RESTORING.

 - The KafkaStreams client state is in RESTORING, if at least one thread
(stream-thread or global-thread) is in state RESTORING.

 - On startup, if there is a global-thread, the just set the
global-restore flag upfront before we start the stream-threads (we can
actually still do the rebalance and potential restore in stream-thread
in parallel to global restore) and rely on the global-thread to unset
the flag.

 - The tricky thing is, to "stop" processing in stream-threads if we
need to wipe the global-store and rebuilt it. For this, we should set
the "global restore flag" on the stream-threads, but we also need to
"lock down" the global store in question and throw an exception if the
stream-thread tries to access it; if the stream-thread get this
exception, it need to cleanup itself, and wait until the "global restore
flag" is unset before it can continue.


Do we think this would work? -- Of course, the devil is in the details
but it seems to become a PR discussion, and there is no reason to make
it part of the KIP.


-Matthias

On 9/3/20 3:41 AM, Navinder Brar wrote:
> Hi,
> 
> Thanks, John, Matthias and Sophie for great feedback.
> 
> On the point raised by Sophie that maybe we should allow normal restoring 
> during GLOBAL_RESTORING, I think it makes sense but the challenge would be 
> what happens when normal restoring(on actives) has finished but 
> GLOBAL_RESTORINGis still going on. Currently, all restorings are independent 
> of each other i.e. restoring happening on one task/thread doesn't affect 
> another. But if we do go ahead with allowing normal restoring during 
> GLOBAL_RESTORING then we willstill have to pause the active tasks from going 
> to RUNNING if GLOBAL_RESTORING has not finished and normal restorings have 
> finished. For the same reason, I wouldn't want to combine global restoring 
> and normal restoring because then it would make all the restorings 
> independent but we don't want that. We want global stores to be available 
> before any processing starts on the active tasks.
> 
> Although I think restoring of replicas can still take place while global 
> stores arerestoring because in replicas there is no danger of them starting 
> processing. 
> 
> Also, one point to bring up is that currently during application startup 
> global stores restore first and then normal stream threads start.
> 
> Regards,Navinder 
> 
>     On Thursday, 3 September, 2020, 06:58:40 am IST, Matthias J. Sax 
> <mj...@apache.org> wrote:  
>  
>  Thanks for the input Sophie. Those are all good points and I fully agree
> with them.
> 
> When saying "pausing the processing threads" I only considered them in
> `RUNNING` and thought we figure out the detail on the PR... Excellent catch!
> 
> Changing state transitions is to some extend backward incompatible, but
> I think (IIRC) we did it in the past and I personally tend to find it
> ok. That's why we cover those changes in a KIP.
> 
> -Matthias
> 
> On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote:
>> If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM,
>> maybe it would make sense to add a new plain RESTORING state that we
>> transition
>> to when restoring non-global state stores following a rebalance. Right now
>> all restoration
>> occurs within the REBALANCING state, which is pretty misleading.
>> Applications that
>> have large amounts of state to restore will appear to be stuck rebalancing
>> according to
>> the state listener, when in fact the rebalance has completed long ago.
>> Given that there
>> are very much real scenarios where you actually *are *stuck rebalancing, it
>> seems useful to
>> distinguish plain restoration from more insidious cases that may require
>> investigation and/or
>> intervention.
>>
>> I don't mean to hijack this KIP, I just think it would be odd to introduce
>> GLOBAL_RESTORING
>> when there is no other kind of RESTORING state. One question this brings
>> up, and I
>> apologize if this has already been addressed, is what to do when we are
>> restoring
>> both normal and global state stores? It sounds like we plan to pause the
>> StreamThreads
>> entirely, but there doesn't seem to be any reason not to allow regular
>> state restoration -- or
>> even standby processing -- while the global state is restoring.Given the
>> current effort to move
>> restoration & standbys to a separate thread, allowing them to continue
>> while pausing
>> only the StreamThread seems quite natural.
>>
>> Assuming that we actually do allow both types of restoration to occur at
>> the same time,
>> and if we did add a plain RESTORING state as well, which state should we
>> end up in?
>> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING state is to
>> alert
>> users of the non-progress of their active tasks. In both cases, the active
>> task is unable
>> to continue until restoration has complete, so why distinguish between the
>> two at all?
>> Would it make sense to avoid a special GLOBAL_RESTORING state and just
>> introduce
>> a single unified RESTORING state to cover both the regular and global case?
>> Just a thought
>>
>> My only concern is that this might be considered a breaking change: users
>> might be
>> looking for the REBALANCING -> RUNNING transition specifically in order to
>> alert when
>> the application has started up, and we would no long go directly from
>> REBALANCING to
>>   RUNNING. I think we actually did/do this ourselves in a number of
>> integration tests and
>> possibly in some examples. That said, it seems more appropriate to just
>> listen for
>> the RUNNING state rather than for a specific transition, and we should
>> encourage users
>> to do so rather than go out of our way to support transition-type state
>> listeners.
>>
>> Cheers,
>> Sophie
>>
>> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> I think this makes sense.
>>>
>>> When we introduce this new state, we might also tackle the jira a
>>> mentioned. If there is a global thread, on startup of a `KafakStreams`
>>> client we should not transit to `REBALANCING` but to the new state, and
>>> maybe also make the "bootstrapping" non-blocking.
>>>
>>> I guess it's worth to mention this in the KIP.
>>>
>>> Btw: The new state for KafkaStreams should also be part of the KIP as it
>>> is a public API change, too.
>>>
>>>
>>> -Matthias
>>>
>>> On 8/29/20 9:37 AM, John Roesler wrote:
>>>> Hi Navinder,
>>>>
>>>> Thanks for the ping. Yes, that all sounds right to me. The name
>>> “RESTORING_GLOBAL” sounds fine, too.
>>>>
>>>> I think as far as warnings go, we’d just propose to mention it in the
>>> javadoc of the relevant methods that the given topics should be compacted.
>>>>
>>>> Thanks!
>>>> -John
>>>>
>>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
>>>>> Gentle ping.
>>>>>
>>>>> ~ Navinder
>>>>>     On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar
>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>
>>>>>
>>>>> Thanks Matthias & John,
>>>>>
>>>>>
>>>>>
>>>>> I am glad we are converging towards an understanding. So, to summarize,
>>>>>
>>>>> we will still keep treating this change in KIP and instead of providing
>>> a reset
>>>>>
>>>>> strategy, we will cleanup, and reset to earliest and build the state.
>>>>>
>>>>> When we hit the exception and we are building the state, we will stop
>>> all
>>>>>
>>>>> processing and change the state of KafkaStreams to something like
>>>>>
>>>>> “RESTORING_GLOBAL” or the like.
>>>>>
>>>>>
>>>>>
>>>>> How do we plan to educate users on the non desired effects of using
>>>>>
>>>>> non-compacted global topics? (via the KIP itself?)
>>>>>
>>>>>
>>>>> +1 on changing the KTable behavior, reset policy for global, connecting
>>>>> processors to global for a later stage when demanded.
>>>>>
>>>>> Regards,
>>>>> Navinder
>>>>>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax
>>>>> <mj...@apache.org> wrote:
>>>>>
>>>>>   Your observation is correct. Connecting (regular) stores to processors
>>>>> is necessary to "merge" sub-topologies into single ones if a store is
>>>>> shared. -- For global stores, the structure of the program does not
>>>>> change and thus connecting srocessors to global stores is not required.
>>>>>
>>>>> Also given our experience with restoring regular state stores (ie,
>>>>> partial processing of task that don't need restore), it seems better to
>>>>> pause processing and move all CPU and network resources to the global
>>>>> thread to rebuild the global store as soon as possible instead of
>>>>> potentially slowing down the restore in order to make progress on some
>>>>> tasks.
>>>>>
>>>>> Of course, if we collect real world experience and it becomes an issue,
>>>>> we could still try to change it?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 8/18/20 3:31 PM, John Roesler wrote:
>>>>>> Thanks Matthias,
>>>>>>
>>>>>> Sounds good. I'm on board with no public API change and just
>>>>>> recovering instead of crashing.
>>>>>>
>>>>>> Also, to be clear, I wouldn't drag KTables into it; I was
>>>>>> just trying to wrap my head around the congruity of our
>>>>>> choice for GlobalKTable with respect to KTable.
>>>>>>
>>>>>> I agree that whatever we decide to do would probably also
>>>>>> resolve KAFKA-7380.
>>>>>>
>>>>>> Moving on to discuss the behavior change, I'm wondering if
>>>>>> we really need to block all the StreamThreads. It seems like
>>>>>> we only need to prevent processing on any task that's
>>>>>> connected to the GlobalStore.
>>>>>>
>>>>>> I just took a look at the topology building code, and it
>>>>>> actually seems that connections to global stores don't need
>>>>>> to be declared. That's a bummer, since it means that we
>>>>>> really do have to stop all processing while the global
>>>>>> thread catches up.
>>>>>>
>>>>>> Changing this seems like it'd be out of scope right now, but
>>>>>> I bring it up in case I'm wrong and it actually is possible
>>>>>> to know which specific tasks need to be synchronized with
>>>>>> which global state stores. If we could know that, then we'd
>>>>>> only have to block some of the tasks, not all of the
>>>>>> threads.
>>>>>>
>>>>>> Thanks,
>>>>>> -John
>>>>>>
>>>>>>
>>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
>>>>>>> Thanks for the discussion.
>>>>>>>
>>>>>>> I agree that this KIP is justified in any case -- even if we don't
>>>>>>> change public API, as the change in behavior is significant.
>>>>>>>
>>>>>>> A better documentation for cleanup policy is always good (even if I am
>>>>>>> not aware of any concrete complaints atm that users were not aware of
>>>>>>> the implications). Of course, for a regular KTable, one can
>>>>>>> enable/disable the source-topic-changelog optimization and thus can
>>> use
>>>>>>> a non-compacted topic for this case, what is quite a difference to
>>>>>>> global stores/tables; so maybe it's worth to point out this difference
>>>>>>> explicitly.
>>>>>>>
>>>>>>> As mentioned before, the main purpose of the original Jira was to
>>> avoid
>>>>>>> the crash situation but to allow for auto-recovering while it was an
>>>>>>> open question if it makes sense / would be useful to allow users to
>>>>>>> specify a custom reset policy instead of using a hard-coded "earliest"
>>>>>>> strategy. -- It seem it's still unclear if it would be useful and thus
>>>>>>> it might be best to not add it for now -- we can still add it later if
>>>>>>> there are concrete use-cases that need this feature.
>>>>>>>
>>>>>>> @John: I actually agree that it's also questionable to allow a custom
>>>>>>> reset policy for KTables... Not sure if we want to drag this question
>>>>>>> into this KIP though?
>>>>>>>
>>>>>>> So it seem, we all agree that we actually don't need any public API
>>>>>>> changes, but we only want to avoid crashing?
>>>>>>>
>>>>>>> For this case, to preserve the current behavior that guarantees that
>>> the
>>>>>>> global store/table is always loaded first, it seems we need to have a
>>>>>>> stop-the-world mechanism for the main `StreamThreads` for this case --
>>>>>>> do we need to add a new state to KafkaStreams client for this case?
>>>>>>>
>>>>>>> Having a new state might also be helpful for
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 8/17/20 7:34 AM, John Roesler wrote:
>>>>>>>> Hi Navinder,
>>>>>>>>
>>>>>>>> I see what you mean about the global consumer being similar
>>>>>>>> to the restore consumer.
>>>>>>>>
>>>>>>>> I also agree that automatically performing the recovery
>>>>>>>> steps should be strictly an improvement over the current
>>>>>>>> situation.
>>>>>>>>
>>>>>>>> Also, yes, it would be a good idea to make it clear that the
>>>>>>>> global topic should be compacted in order to ensure correct
>>>>>>>> semantics. It's the same way with input topics for KTables;
>>>>>>>> we rely on users to ensure the topics are compacted, and if
>>>>>>>> they aren't, then the execution semantics will be broken.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
>>>>>>>>> Hi John,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks for your inputs. Since, global topics are in a way their own
>>> changelog, wouldn’t the global consumers be more akin to restore consumers
>>> than the main consumer?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am also +1 on catching the exception and setting it to the
>>> earliest for now. Whenever an instance starts, currently global stream
>>> thread(if available) goes to RUNNING before stream threads are started so
>>> that means the global state is available when the processing by stream
>>> threads start. So, with the new change of catching the exception, cleaning
>>> store and resetting to earlier would probably be “stop the world” as you
>>> said John, as I think we will have to pause the stream threads till the
>>> whole global state is recovered. I assume it is "stop the world" right now
>>> as well, since now also if an InvalidOffsetException comes, we throw
>>> streams exception and the user has to clean up and handle all this manually
>>> and when that instance will start, it will restore global state first.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I had an additional thought to this whole problem, would it be
>>> helpful to educate the users that global topics should have cleanup policy
>>> as compact, so that this invalid offset exception never arises for them.
>>> Assume for example, that the cleanup policy in global topic is "delete" and
>>> it has deleted k1, k2 keys(via retention.ms) although all the instances
>>> had already consumed them so they are in all global stores and all other
>>> instances are up to date on the global data(so no InvalidOffsetException).
>>> Now, a new instance is added to the cluster, and we have already lost k1,
>>> k2 from the global topic so it will start consuming from the earliest point
>>> in the global topic. So, wouldn’t this global store on the new instance has
>>> 2 keys less than all the other global stores already available in the
>>> cluster? Please let me know if I am missing something. Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Navinder
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler <
>>> vvcep...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>   Hi all,
>>>>>>>>>
>>>>>>>>> It seems like the main motivation for this proposal is satisfied if
>>> we just implement some recovery mechanism instead of crashing. If the
>>> mechanism is going to be pausing all the threads until the state is
>>> recovered, then it still seems like a big enough behavior change to warrant
>>> a KIP still.
>>>>>>>>>
>>>>>>>>> I have to confess I’m a little unclear on why a custom reset policy
>>> for a global store, table, or even consumer might be considered wrong. It’s
>>> clearly wrong for the restore consumer, but the global consumer seems more
>>> semantically akin to the main consumer than the restore consumer.
>>>>>>>>>
>>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from latest,
>>> shouldn’t it also be wrong for a KTable, for exactly the same reason? It
>>> certainly seems like it would be an odd choice, but I’ve seen many choices
>>> I thought were odd turn out to have perfectly reasonable use cases.
>>>>>>>>>
>>>>>>>>> As far as the PAPI global store goes, I could see adding the option
>>> to configure it, since as Matthias pointed out, there’s really no specific
>>> semantics for the PAPI. But if automatic recovery is really all Navinder
>>> wanted, the I could also see deferring this until someone specifically
>>> wants it.
>>>>>>>>>
>>>>>>>>> So the tl;dr is, if we just want to catch the exception and rebuild
>>> the store by seeking to earliest with no config or API changes, then I’m +1.
>>>>>>>>>
>>>>>>>>> I’m wondering if we can improve on the “stop the world” effect of
>>> rebuilding the global store, though. It seems like we could put our heads
>>> together and come up with a more fine-grained approach to maintaining the
>>> right semantics during recovery while still making some progress.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> John
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
>>>>>>>>>> Hi Matthias,
>>>>>>>>>>
>>>>>>>>>> IMHO, now as you explained using
>>> ‘global.consumer.auto.offset.reset’ is
>>>>>>>>>> not as straightforward
>>>>>>>>>> as it seems and it might change the existing behavior for users
>>> without
>>>>>>>>>> they releasing it, I also
>>>>>>>>>>
>>>>>>>>>> think that we should change the behavior inside global stream
>>> thread to
>>>>>>>>>> not die on
>>>>>>>>>>
>>>>>>>>>> InvalidOffsetException and instead clean and rebuild the state
>>> from the
>>>>>>>>>> earliest. On this, as you
>>>>>>>>>>
>>>>>>>>>> mentioned that we would need to pause the stream threads till the
>>>>>>>>>> global store is completely restored.
>>>>>>>>>>
>>>>>>>>>> Without it, there will be incorrect processing results if they are
>>>>>>>>>> utilizing a global store during processing.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> So, basically we can divide the use-cases into 4 parts.
>>>>>>>>>>
>>>>>>>>>>     - PAPI based global stores (will have the earliest hardcoded)
>>>>>>>>>>     - PAPI based state stores (already has auto.reset.config)
>>>>>>>>>>     - DSL based GlobalKTables (will have earliest hardcoded)
>>>>>>>>>>     - DSL based KTables (will continue with auto.reset.config)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> So, this would mean that we are not changing any existing
>>> behaviors
>>>>>>>>>> with this if I am right.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I guess we could improve the code to actually log a warning for
>>> this
>>>>>>>>>>
>>>>>>>>>> case, similar to what we do for some configs already (cf
>>>>>>>>>>
>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>>
>>>>>>>>>>>> I like this idea. In case we go ahead with the above approach
>>> and if we can’t
>>>>>>>>>>
>>>>>>>>>> deprecate it, we should educate users that this config doesn’t
>>> work.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Looking forward to hearing thoughts from others as well.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST,
>>> Matthias J.
>>>>>>>>>> Sax <mj...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>   Navinder,
>>>>>>>>>>
>>>>>>>>>> thanks for updating the KIP. I think the motivation section is not
>>>>>>>>>> totally accurate (what is not your fault though, as the history of
>>> how
>>>>>>>>>> we handle this case is intertwined...) For example,
>>> "auto.offset.reset"
>>>>>>>>>> is hard-coded for the global consumer to "none" and using
>>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf
>>>>>>>>>>
>>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values
>>> )
>>>>>>>>>>
>>>>>>>>>> Also, we could not even really deprecate the config as mentioned in
>>>>>>>>>> rejected alternatives sections, because we need
>>> `auto.offset.reset` for
>>>>>>>>>> the main consumer -- and adding a prefix is independent of it.
>>> Also,
>>>>>>>>>> because we ignore the config, it's is also deprecated/removed if
>>> you wish.
>>>>>>>>>>
>>>>>>>>>> I guess we could improve the code to actually log a warning for
>>> this
>>>>>>>>>> case, similar to what we do for some configs already (cf
>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The other question is about compatibility with regard to default
>>>>>>>>>> behavior: if we want to reintroduce
>>> `global.consumer.auto.offset.reset`
>>>>>>>>>> this basically implies that we need to respect
>>> `auto.offset.reset`, too.
>>>>>>>>>> Remember, that any config without prefix is applied to all clients
>>> that
>>>>>>>>>> support this config. Thus, if a user does not limit the scope of
>>> the
>>>>>>>>>> config to the main consumer (via
>>> `main.consumer.auto.offset.reset`) but
>>>>>>>>>> uses the non-prefix versions and sets it to "latest" (and relies
>>> on the
>>>>>>>>>> current behavior that `auto.offset.reset` is "none", and
>>> effectively
>>>>>>>>>> "earliest" on the global consumer), the user might end up with a
>>>>>>>>>> surprise as the global consumer behavior would switch from
>>> "earliest" to
>>>>>>>>>> "latest" (most likely unintentionally). Bottom line is, that users
>>> might
>>>>>>>>>> need to change configs to preserve the old behavior...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> However, before we discuss those details, I think we should
>>> discuss the
>>>>>>>>>> topic in a broader context first:
>>>>>>>>>>
>>>>>>>>>>   - for a GlobalKTable, does it even make sense from a correctness
>>> point
>>>>>>>>>> of view, to allow users to set a custom reset policy? It seems you
>>>>>>>>>> currently don't propose this in the KIP, but as you don't mention
>>> it
>>>>>>>>>> explicitly it's unclear if that on purpose of an oversight?
>>>>>>>>>>
>>>>>>>>>>   - Should we treat global stores differently to GlobalKTables and
>>> allow
>>>>>>>>>> for more flexibility (as the PAPI does not really provide any
>>> semantic
>>>>>>>>>> contract). It seems that is what you propose in the KIP. We should
>>>>>>>>>> discuss if this flexibility does make sense or not for the PAPI,
>>> or if
>>>>>>>>>> we should apply the same reasoning about correctness we use for
>>> KTables
>>>>>>>>>> to global stores? To what extend are/should they be different?
>>>>>>>>>>
>>>>>>>>>>   - If we support auto.offset.reset for global store, how should we
>>>>>>>>>> handle the initial bootstrapping of the store/table (that is
>>> hard-coded
>>>>>>>>>> atm)? Should we skip it if the policy is "latest" and start with an
>>>>>>>>>> empty state? Note that we did consider this behavior incorrect via
>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am
>>> wondering
>>>>>>>>>> why should we change it back again?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Finally, the main motivation for the Jira ticket was to let the
>>> runtime
>>>>>>>>>> auto-recover instead of dying as it does currently. If we decide
>>> that a
>>>>>>>>>> custom reset policy does actually not make sense, we can just
>>> change the
>>>>>>>>>> global-thread to not die any longer on an `InvalidOffsetException`
>>> but
>>>>>>>>>> rebuild the state automatically. This would be "only" a behavior
>>> change
>>>>>>>>>> but does not require any public API changes. -- For this case, we
>>> should
>>>>>>>>>> also think about the synchronization with the main processing
>>> threads?
>>>>>>>>>> On startup we bootstrap the global stores before processing
>>> happens.
>>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global thread
>>> dies,
>>>>>>>>>> the main threads cannot access the global stores any longer an
>>> also die.
>>>>>>>>>> If we re-build the state though, do we need to pause the main
>>> thread
>>>>>>>>>> during this phase?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote:
>>>>>>>>>>> Hi John,
>>>>>>>>>>>
>>>>>>>>>>> I have updated the KIP to make the motivation more clear. In a
>>> nutshell, we will use the already existing config
>>> "global.consumer.auto.offset.reset" for users to set a blanket reset policy
>>> for all global topics and add a new interface to set per-topic reset policy
>>> for each global topic(for which we specifically need this KIP). There was a
>>> point raised from Matthias above to always reset to earliest by cleaning
>>> the stores and seekToBeginning in case of InvalidOffsetException. We can go
>>> with that route as well and I don't think it would need a KIP as if we are
>>> not providing users an option to have blanket reset policy on global
>>> topics, then a per-topic override would also not be required(the KIP is
>>> required basically for that). Although, I think if users have an option to
>>> choose reset policy for StreamThread then the option should be provided for
>>> GlobalStreamThread as well and if we don't want to use the
>>> "global.consumer.auto.offset.reset" then we would need to deprecate it
>>> because currently it's not serving any purpose. For now, I have added it in
>>> rejected alternatives but we can discuss this.
>>>>>>>>>>>
>>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias we have
>>> fixed it last week as part of KAFKA-10306.
>>>>>>>>>>>
>>>>>>>>>>> ~Navinder
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar <
>>> navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Sorry, it took some time to respond back.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> “but I thought we would pass the config through to the client.”
>>>>>>>>>>>
>>>>>>>>>>>>> @John, sure we can use the config in GloablStreamThread, that
>>> could be one of the way to solve it.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> @Matthias, sure cleaning the store and recreating is one way but
>>> since we are giving an option to reset in StreamThread why the
>>> implementation should be different in GlobalStreamThread. I think we should
>>> use the global.consumer.auto.offset.reset config to accept the reset
>>> strategy opted by the user although I would be ok with just cleaning and
>>> resetting to the latest as well for now. Currently, we throw a
>>> StreamsException in case of InvalidOffsetException in GlobalStreamThread so
>>> just resetting would still be better than what happens currently.
>>>>>>>>>>>
>>>>>>>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable
>>> ‘* Note that {@link GlobalKTable} always applies {@code
>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the
>>> specified value in {@link StreamsConfig} or {@link Consumed}.’
>>>>>>>>>>> So, I guess we are already cleaning up and recreating for
>>> GlobalKTable from earliest offset.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> @Guozhan while looking at the code, I also noticed a TODO:
>>> pending in GlobalStateManagerImpl, when InvalidOffsetException is thrown.
>>> Earlier, we were directly clearing the store here and recreating from
>>> scratch but that code piece is removed now. Are you working on a follow-up
>>> PR for this or just handling the reset in GlobalStreamThread should be
>>> sufficient?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Navinder
>>>>>>>>>>>
>>>>>>>>>>>     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax <
>>> mj...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>   Atm, the config should be ignored and the global-consumer
>>> should use
>>>>>>>>>>> "none" in a hard-coded way.
>>>>>>>>>>>
>>>>>>>>>>> However, if am still wondering if we actually want/need to allow
>>> users
>>>>>>>>>>> to specify the reset policy? It might be worth to consider, to
>>> just
>>>>>>>>>>> change the behavior: catch the exception, log an ERROR (for
>>> information
>>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate the
>>> store?
>>>>>>>>>>>
>>>>>>>>>>> Btw: if we want to allow users to set the reset policy, this
>>> should be
>>>>>>>>>>> possible via the config, or via overwriting the config in the
>>> method
>>>>>>>>>>> itself. Thus, we would need to add the new overloaded method to
>>>>>>>>>>> `Topology` and `StreamsBuilder`.
>>>>>>>>>>>
>>>>>>>>>>> Another question to ask: what about GlobalKTables? Should they
>>> behave
>>>>>>>>>>> the same? An alternative design could be, to allow users to
>>> specify a
>>>>>>>>>>> flexible reset policy for global-stores, but not for
>>> GlobalKTables and
>>>>>>>>>>> use the strategy suggested above for this case.
>>>>>>>>>>>
>>>>>>>>>>> Thoughts?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote:
>>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... You
>>> said we are not currently using the config, but I thought we would pass the
>>> config through to the client.  Can you confirm whether or not the existing
>>> config works for your use case?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> John
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
>>>>>>>>>>>>> Sorry my bad. Found it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs
>>> for the
>>>>>>>>>>>>> global consumer client from
>>>>>>>>>>>>>
>>>>>>>>>>>>> * the general consumer client configs. The override precedence
>>> is the
>>>>>>>>>>>>> following (from highest to lowest precedence):
>>>>>>>>>>>>> * 1. global.consumer.[config-name]..
>>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX =
>>> "global.consumer.";
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> So, that's great. We already have a config exposed to reset
>>> offsets for
>>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset just that
>>> we are
>>>>>>>>>>>>> not actually using it inside GlobalStreamThread to reset.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Navinder
>>>>>>>>>>>>>     On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar
>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>   Hi John,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your feedback.
>>>>>>>>>>>>> 1. I think there is some confusion on my first point, the enum
>>> I am
>>>>>>>>>>>>> sure we can use the same one but the external config which
>>> controls the
>>>>>>>>>>>>> resetting in global stream thread either we can the same one
>>> which
>>>>>>>>>>>>> users use for source topics(StreamThread) or we can provide a
>>> new one
>>>>>>>>>>>>> which specifically controls global topics. For e.g. currently
>>> if I get
>>>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I can
>>> choose
>>>>>>>>>>>>> whether to reset from Earliest or Latest(with
>>> auto.offset.reset). Now
>>>>>>>>>>>>> either we can use the same option and say if I get the same
>>> exception
>>>>>>>>>>>>> for global topics I will follow same resetting. Or some users
>>> might
>>>>>>>>>>>>> want to have totally different setting for both source and
>>> global
>>>>>>>>>>>>> topics, like for source topic I want resetting from Latest but
>>> for
>>>>>>>>>>>>> global topics I want resetting from Earliest so in that case
>>> adding a
>>>>>>>>>>>>> new config might be better.
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. I couldn't find this config currently
>>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in
>>> GlobalStreamThread.java
>>>>>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException
>>> and there
>>>>>>>>>>>>> is a test as
>>>>>>>>>>>>> well
>>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I
>>>>>>>>>>>>> think this is the config we are trying to introduce with this
>>> KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John
>>> Roesler
>>>>>>>>>>>>> <j...@vvcephei.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>   Hi Navinder,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for this proposal!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding your question about whether to use the same policy
>>>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I think
>>>>>>>>>>>>> we can just use the same AutoOffsetReset enum.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can you confirm whether setting the reset policy config on the
>>>>>>>>>>>>> global consumer currently works or not? Based on my reading
>>>>>>>>>>>>> of StreamsConfig, it looks like it would be:
>>>>>>>>>>>>> "global.consumer.auto.offset.reset".
>>>>>>>>>>>>>
>>>>>>>>>>>>> If that does work, would you still propose to augment the
>>>>>>>>>>>>> Java API?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> KIP:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have taken over this KIP since it has been dormant for a
>>> long time
>>>>>>>>>>>>>> and this looks important for use-cases that have large global
>>> data, so
>>>>>>>>>>>>>> rebuilding global stores from scratch might seem overkill in
>>> case of
>>>>>>>>>>>>>> InvalidOffsetExecption.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We want to give users the control to use reset policy(as we do
>>> in
>>>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have still
>>> not
>>>>>>>>>>>>>> decided whether to restrict this option to the same reset
>>> policy being
>>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config) or add
>>> another
>>>>>>>>>>>>>> reset config specifically for global stores
>>>>>>>>>>>>>> "global.auto.offset.reset" which gives users more control to
>>> choose
>>>>>>>>>>>>>> separate policies for global and stream threads.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would like to hear your opinions on the KIP.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Navinder
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>>
>>>
>>>
>>
>   
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to