Thanks Matthias, that sounds like what I was thinking. I think we should
always be
able to figure out what to do in various scenarios as outlined in the
previous email.

>  For the same reason, I wouldn't want to combine global restoring and
normal restoring
> because then it would make all the restorings independent but we don't
want that. We
> want global stores to be available before any processing starts on the
active tasks.

I'm not sure I follow this specific point, but I don't think I did a good
job of explaining my
proposal so it's probably my own fault. When I say that we should merge
RESTORING
and GLOBAL_RESTORING, I just mean that we should provide a single
user-facing
state to encompass any ongoing restoration. The point of the KafkaStreams
RESTORING
state is to alert users that their state may be unavailable for IQ, and
active tasks may be
idle. This is true for both global and non-global restoration. I think the
ultimate question
is whether as a user, I would react any differently to a GLOBAL_RESTORING
state vs
the regular RESTORING. My take is "no", in which case we should just
provide a single
unified state for the minimal public API. But if anyone can think of a
reason for the user
to need to distinguish between different types of restoration, that would
be a good
argument to keep them separate.

Internally, we do need to keep track of a "global restore" flag to
determine the course
of action -- for example if a StreamThread transitions to RUNNING but sees
that the
KafkaStreams state is RESTORING, should it start processing or not? The
answer
depends on whether the state is RESTORING due to any global stores. But the
KafkaStreams state is a public interface, not an internal bookkeeper, so we
shouldn't
try to push our internal logic into the user-facing API.


On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax <mj...@apache.org> wrote:

> I think this issue can actually be resolved.
>
>  - We need a flag on the stream-threads if global-restore is in
> progress; for this case, the stream-thread may go into RUNNING state,
> but it's not allowed to actually process data -- it will be allowed to
> update standby-task though.
>
>  - If a stream-thread restores, its own state is RESTORING and it does
> not need to care about the "global restore flag".
>
>  - The global-thread just does was we discussed, including using state
> RESTORING.
>
>  - The KafkaStreams client state is in RESTORING, if at least one thread
> (stream-thread or global-thread) is in state RESTORING.
>
>  - On startup, if there is a global-thread, the just set the
> global-restore flag upfront before we start the stream-threads (we can
> actually still do the rebalance and potential restore in stream-thread
> in parallel to global restore) and rely on the global-thread to unset
> the flag.
>
>  - The tricky thing is, to "stop" processing in stream-threads if we
> need to wipe the global-store and rebuilt it. For this, we should set
> the "global restore flag" on the stream-threads, but we also need to
> "lock down" the global store in question and throw an exception if the
> stream-thread tries to access it; if the stream-thread get this
> exception, it need to cleanup itself, and wait until the "global restore
> flag" is unset before it can continue.
>
>
> Do we think this would work? -- Of course, the devil is in the details
> but it seems to become a PR discussion, and there is no reason to make
> it part of the KIP.
>
>
> -Matthias
>
> On 9/3/20 3:41 AM, Navinder Brar wrote:
> > Hi,
> >
> > Thanks, John, Matthias and Sophie for great feedback.
> >
> > On the point raised by Sophie that maybe we should allow normal
> restoring during GLOBAL_RESTORING, I think it makes sense but the challenge
> would be what happens when normal restoring(on actives) has finished but
> GLOBAL_RESTORINGis still going on. Currently, all restorings are
> independent of each other i.e. restoring happening on one task/thread
> doesn't affect another. But if we do go ahead with allowing normal
> restoring during GLOBAL_RESTORING then we willstill have to pause the
> active tasks from going to RUNNING if GLOBAL_RESTORING has not finished and
> normal restorings have finished. For the same reason, I wouldn't want to
> combine global restoring and normal restoring because then it would make
> all the restorings independent but we don't want that. We want global
> stores to be available before any processing starts on the active tasks.
> >
> > Although I think restoring of replicas can still take place while global
> stores arerestoring because in replicas there is no danger of them starting
> processing.
> >
> > Also, one point to bring up is that currently during application startup
> global stores restore first and then normal stream threads start.
> >
> > Regards,Navinder
> >
> >     On Thursday, 3 September, 2020, 06:58:40 am IST, Matthias J. Sax <
> mj...@apache.org> wrote:
> >
> >  Thanks for the input Sophie. Those are all good points and I fully agree
> > with them.
> >
> > When saying "pausing the processing threads" I only considered them in
> > `RUNNING` and thought we figure out the detail on the PR... Excellent
> catch!
> >
> > Changing state transitions is to some extend backward incompatible, but
> > I think (IIRC) we did it in the past and I personally tend to find it
> > ok. That's why we cover those changes in a KIP.
> >
> > -Matthias
> >
> > On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote:
> >> If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams
> FSM,
> >> maybe it would make sense to add a new plain RESTORING state that we
> >> transition
> >> to when restoring non-global state stores following a rebalance. Right
> now
> >> all restoration
> >> occurs within the REBALANCING state, which is pretty misleading.
> >> Applications that
> >> have large amounts of state to restore will appear to be stuck
> rebalancing
> >> according to
> >> the state listener, when in fact the rebalance has completed long ago.
> >> Given that there
> >> are very much real scenarios where you actually *are *stuck
> rebalancing, it
> >> seems useful to
> >> distinguish plain restoration from more insidious cases that may require
> >> investigation and/or
> >> intervention.
> >>
> >> I don't mean to hijack this KIP, I just think it would be odd to
> introduce
> >> GLOBAL_RESTORING
> >> when there is no other kind of RESTORING state. One question this brings
> >> up, and I
> >> apologize if this has already been addressed, is what to do when we are
> >> restoring
> >> both normal and global state stores? It sounds like we plan to pause the
> >> StreamThreads
> >> entirely, but there doesn't seem to be any reason not to allow regular
> >> state restoration -- or
> >> even standby processing -- while the global state is restoring.Given the
> >> current effort to move
> >> restoration & standbys to a separate thread, allowing them to continue
> >> while pausing
> >> only the StreamThread seems quite natural.
> >>
> >> Assuming that we actually do allow both types of restoration to occur at
> >> the same time,
> >> and if we did add a plain RESTORING state as well, which state should we
> >> end up in?
> >> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING state
> is to
> >> alert
> >> users of the non-progress of their active tasks. In both cases, the
> active
> >> task is unable
> >> to continue until restoration has complete, so why distinguish between
> the
> >> two at all?
> >> Would it make sense to avoid a special GLOBAL_RESTORING state and just
> >> introduce
> >> a single unified RESTORING state to cover both the regular and global
> case?
> >> Just a thought
> >>
> >> My only concern is that this might be considered a breaking change:
> users
> >> might be
> >> looking for the REBALANCING -> RUNNING transition specifically in order
> to
> >> alert when
> >> the application has started up, and we would no long go directly from
> >> REBALANCING to
> >>   RUNNING. I think we actually did/do this ourselves in a number of
> >> integration tests and
> >> possibly in some examples. That said, it seems more appropriate to just
> >> listen for
> >> the RUNNING state rather than for a specific transition, and we should
> >> encourage users
> >> to do so rather than go out of our way to support transition-type state
> >> listeners.
> >>
> >> Cheers,
> >> Sophie
> >>
> >> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>
> >>> I think this makes sense.
> >>>
> >>> When we introduce this new state, we might also tackle the jira a
> >>> mentioned. If there is a global thread, on startup of a `KafakStreams`
> >>> client we should not transit to `REBALANCING` but to the new state, and
> >>> maybe also make the "bootstrapping" non-blocking.
> >>>
> >>> I guess it's worth to mention this in the KIP.
> >>>
> >>> Btw: The new state for KafkaStreams should also be part of the KIP as
> it
> >>> is a public API change, too.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 8/29/20 9:37 AM, John Roesler wrote:
> >>>> Hi Navinder,
> >>>>
> >>>> Thanks for the ping. Yes, that all sounds right to me. The name
> >>> “RESTORING_GLOBAL” sounds fine, too.
> >>>>
> >>>> I think as far as warnings go, we’d just propose to mention it in the
> >>> javadoc of the relevant methods that the given topics should be
> compacted.
> >>>>
> >>>> Thanks!
> >>>> -John
> >>>>
> >>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
> >>>>> Gentle ping.
> >>>>>
> >>>>> ~ Navinder
> >>>>>     On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar
> >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> >>>>>
> >>>>>
> >>>>> Thanks Matthias & John,
> >>>>>
> >>>>>
> >>>>>
> >>>>> I am glad we are converging towards an understanding. So, to
> summarize,
> >>>>>
> >>>>> we will still keep treating this change in KIP and instead of
> providing
> >>> a reset
> >>>>>
> >>>>> strategy, we will cleanup, and reset to earliest and build the state.
> >>>>>
> >>>>> When we hit the exception and we are building the state, we will stop
> >>> all
> >>>>>
> >>>>> processing and change the state of KafkaStreams to something like
> >>>>>
> >>>>> “RESTORING_GLOBAL” or the like.
> >>>>>
> >>>>>
> >>>>>
> >>>>> How do we plan to educate users on the non desired effects of using
> >>>>>
> >>>>> non-compacted global topics? (via the KIP itself?)
> >>>>>
> >>>>>
> >>>>> +1 on changing the KTable behavior, reset policy for global,
> connecting
> >>>>> processors to global for a later stage when demanded.
> >>>>>
> >>>>> Regards,
> >>>>> Navinder
> >>>>>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax
> >>>>> <mj...@apache.org> wrote:
> >>>>>
> >>>>>   Your observation is correct. Connecting (regular) stores to
> processors
> >>>>> is necessary to "merge" sub-topologies into single ones if a store is
> >>>>> shared. -- For global stores, the structure of the program does not
> >>>>> change and thus connecting srocessors to global stores is not
> required.
> >>>>>
> >>>>> Also given our experience with restoring regular state stores (ie,
> >>>>> partial processing of task that don't need restore), it seems better
> to
> >>>>> pause processing and move all CPU and network resources to the global
> >>>>> thread to rebuild the global store as soon as possible instead of
> >>>>> potentially slowing down the restore in order to make progress on
> some
> >>>>> tasks.
> >>>>>
> >>>>> Of course, if we collect real world experience and it becomes an
> issue,
> >>>>> we could still try to change it?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 8/18/20 3:31 PM, John Roesler wrote:
> >>>>>> Thanks Matthias,
> >>>>>>
> >>>>>> Sounds good. I'm on board with no public API change and just
> >>>>>> recovering instead of crashing.
> >>>>>>
> >>>>>> Also, to be clear, I wouldn't drag KTables into it; I was
> >>>>>> just trying to wrap my head around the congruity of our
> >>>>>> choice for GlobalKTable with respect to KTable.
> >>>>>>
> >>>>>> I agree that whatever we decide to do would probably also
> >>>>>> resolve KAFKA-7380.
> >>>>>>
> >>>>>> Moving on to discuss the behavior change, I'm wondering if
> >>>>>> we really need to block all the StreamThreads. It seems like
> >>>>>> we only need to prevent processing on any task that's
> >>>>>> connected to the GlobalStore.
> >>>>>>
> >>>>>> I just took a look at the topology building code, and it
> >>>>>> actually seems that connections to global stores don't need
> >>>>>> to be declared. That's a bummer, since it means that we
> >>>>>> really do have to stop all processing while the global
> >>>>>> thread catches up.
> >>>>>>
> >>>>>> Changing this seems like it'd be out of scope right now, but
> >>>>>> I bring it up in case I'm wrong and it actually is possible
> >>>>>> to know which specific tasks need to be synchronized with
> >>>>>> which global state stores. If we could know that, then we'd
> >>>>>> only have to block some of the tasks, not all of the
> >>>>>> threads.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>>
> >>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
> >>>>>>> Thanks for the discussion.
> >>>>>>>
> >>>>>>> I agree that this KIP is justified in any case -- even if we don't
> >>>>>>> change public API, as the change in behavior is significant.
> >>>>>>>
> >>>>>>> A better documentation for cleanup policy is always good (even if
> I am
> >>>>>>> not aware of any concrete complaints atm that users were not aware
> of
> >>>>>>> the implications). Of course, for a regular KTable, one can
> >>>>>>> enable/disable the source-topic-changelog optimization and thus can
> >>> use
> >>>>>>> a non-compacted topic for this case, what is quite a difference to
> >>>>>>> global stores/tables; so maybe it's worth to point out this
> difference
> >>>>>>> explicitly.
> >>>>>>>
> >>>>>>> As mentioned before, the main purpose of the original Jira was to
> >>> avoid
> >>>>>>> the crash situation but to allow for auto-recovering while it was
> an
> >>>>>>> open question if it makes sense / would be useful to allow users to
> >>>>>>> specify a custom reset policy instead of using a hard-coded
> "earliest"
> >>>>>>> strategy. -- It seem it's still unclear if it would be useful and
> thus
> >>>>>>> it might be best to not add it for now -- we can still add it
> later if
> >>>>>>> there are concrete use-cases that need this feature.
> >>>>>>>
> >>>>>>> @John: I actually agree that it's also questionable to allow a
> custom
> >>>>>>> reset policy for KTables... Not sure if we want to drag this
> question
> >>>>>>> into this KIP though?
> >>>>>>>
> >>>>>>> So it seem, we all agree that we actually don't need any public API
> >>>>>>> changes, but we only want to avoid crashing?
> >>>>>>>
> >>>>>>> For this case, to preserve the current behavior that guarantees
> that
> >>> the
> >>>>>>> global store/table is always loaded first, it seems we need to
> have a
> >>>>>>> stop-the-world mechanism for the main `StreamThreads` for this
> case --
> >>>>>>> do we need to add a new state to KafkaStreams client for this case?
> >>>>>>>
> >>>>>>> Having a new state might also be helpful for
> >>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ?
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 8/17/20 7:34 AM, John Roesler wrote:
> >>>>>>>> Hi Navinder,
> >>>>>>>>
> >>>>>>>> I see what you mean about the global consumer being similar
> >>>>>>>> to the restore consumer.
> >>>>>>>>
> >>>>>>>> I also agree that automatically performing the recovery
> >>>>>>>> steps should be strictly an improvement over the current
> >>>>>>>> situation.
> >>>>>>>>
> >>>>>>>> Also, yes, it would be a good idea to make it clear that the
> >>>>>>>> global topic should be compacted in order to ensure correct
> >>>>>>>> semantics. It's the same way with input topics for KTables;
> >>>>>>>> we rely on users to ensure the topics are compacted, and if
> >>>>>>>> they aren't, then the execution semantics will be broken.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
> >>>>>>>>> Hi John,
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thanks for your inputs. Since, global topics are in a way their
> own
> >>> changelog, wouldn’t the global consumers be more akin to restore
> consumers
> >>> than the main consumer?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I am also +1 on catching the exception and setting it to the
> >>> earliest for now. Whenever an instance starts, currently global stream
> >>> thread(if available) goes to RUNNING before stream threads are started
> so
> >>> that means the global state is available when the processing by stream
> >>> threads start. So, with the new change of catching the exception,
> cleaning
> >>> store and resetting to earlier would probably be “stop the world” as
> you
> >>> said John, as I think we will have to pause the stream threads till the
> >>> whole global state is recovered. I assume it is "stop the world" right
> now
> >>> as well, since now also if an InvalidOffsetException comes, we throw
> >>> streams exception and the user has to clean up and handle all this
> manually
> >>> and when that instance will start, it will restore global state first.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I had an additional thought to this whole problem, would it be
> >>> helpful to educate the users that global topics should have cleanup
> policy
> >>> as compact, so that this invalid offset exception never arises for
> them.
> >>> Assume for example, that the cleanup policy in global topic is
> "delete" and
> >>> it has deleted k1, k2 keys(via retention.ms) although all the
> instances
> >>> had already consumed them so they are in all global stores and all
> other
> >>> instances are up to date on the global data(so no
> InvalidOffsetException).
> >>> Now, a new instance is added to the cluster, and we have already lost
> k1,
> >>> k2 from the global topic so it will start consuming from the earliest
> point
> >>> in the global topic. So, wouldn’t this global store on the new
> instance has
> >>> 2 keys less than all the other global stores already available in the
> >>> cluster? Please let me know if I am missing something. Thanks.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>>
> >>>>>>>>> Navinder
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>     On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler <
> >>> vvcep...@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>>   Hi all,
> >>>>>>>>>
> >>>>>>>>> It seems like the main motivation for this proposal is satisfied
> if
> >>> we just implement some recovery mechanism instead of crashing. If the
> >>> mechanism is going to be pausing all the threads until the state is
> >>> recovered, then it still seems like a big enough behavior change to
> warrant
> >>> a KIP still.
> >>>>>>>>>
> >>>>>>>>> I have to confess I’m a little unclear on why a custom reset
> policy
> >>> for a global store, table, or even consumer might be considered wrong.
> It’s
> >>> clearly wrong for the restore consumer, but the global consumer seems
> more
> >>> semantically akin to the main consumer than the restore consumer.
> >>>>>>>>>
> >>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from
> latest,
> >>> shouldn’t it also be wrong for a KTable, for exactly the same reason?
> It
> >>> certainly seems like it would be an odd choice, but I’ve seen many
> choices
> >>> I thought were odd turn out to have perfectly reasonable use cases.
> >>>>>>>>>
> >>>>>>>>> As far as the PAPI global store goes, I could see adding the
> option
> >>> to configure it, since as Matthias pointed out, there’s really no
> specific
> >>> semantics for the PAPI. But if automatic recovery is really all
> Navinder
> >>> wanted, the I could also see deferring this until someone specifically
> >>> wants it.
> >>>>>>>>>
> >>>>>>>>> So the tl;dr is, if we just want to catch the exception and
> rebuild
> >>> the store by seeking to earliest with no config or API changes, then
> I’m +1.
> >>>>>>>>>
> >>>>>>>>> I’m wondering if we can improve on the “stop the world” effect of
> >>> rebuilding the global store, though. It seems like we could put our
> heads
> >>> together and come up with a more fine-grained approach to maintaining
> the
> >>> right semantics during recovery while still making some progress.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> John
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
> >>>>>>>>>> Hi Matthias,
> >>>>>>>>>>
> >>>>>>>>>> IMHO, now as you explained using
> >>> ‘global.consumer.auto.offset.reset’ is
> >>>>>>>>>> not as straightforward
> >>>>>>>>>> as it seems and it might change the existing behavior for users
> >>> without
> >>>>>>>>>> they releasing it, I also
> >>>>>>>>>>
> >>>>>>>>>> think that we should change the behavior inside global stream
> >>> thread to
> >>>>>>>>>> not die on
> >>>>>>>>>>
> >>>>>>>>>> InvalidOffsetException and instead clean and rebuild the state
> >>> from the
> >>>>>>>>>> earliest. On this, as you
> >>>>>>>>>>
> >>>>>>>>>> mentioned that we would need to pause the stream threads till
> the
> >>>>>>>>>> global store is completely restored.
> >>>>>>>>>>
> >>>>>>>>>> Without it, there will be incorrect processing results if they
> are
> >>>>>>>>>> utilizing a global store during processing.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> So, basically we can divide the use-cases into 4 parts.
> >>>>>>>>>>
> >>>>>>>>>>     - PAPI based global stores (will have the earliest
> hardcoded)
> >>>>>>>>>>     - PAPI based state stores (already has auto.reset.config)
> >>>>>>>>>>     - DSL based GlobalKTables (will have earliest hardcoded)
> >>>>>>>>>>     - DSL based KTables (will continue with auto.reset.config)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> So, this would mean that we are not changing any existing
> >>> behaviors
> >>>>>>>>>> with this if I am right.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I guess we could improve the code to actually log a warning for
> >>> this
> >>>>>>>>>>
> >>>>>>>>>> case, similar to what we do for some configs already (cf
> >>>>>>>>>>
> >>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> >>>>>>>>>>
> >>>>>>>>>>>> I like this idea. In case we go ahead with the above approach
> >>> and if we can’t
> >>>>>>>>>>
> >>>>>>>>>> deprecate it, we should educate users that this config doesn’t
> >>> work.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to hearing thoughts from others as well.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST,
> >>> Matthias J.
> >>>>>>>>>> Sax <mj...@apache.org> wrote:
> >>>>>>>>>>
> >>>>>>>>>>   Navinder,
> >>>>>>>>>>
> >>>>>>>>>> thanks for updating the KIP. I think the motivation section is
> not
> >>>>>>>>>> totally accurate (what is not your fault though, as the history
> of
> >>> how
> >>>>>>>>>> we handle this case is intertwined...) For example,
> >>> "auto.offset.reset"
> >>>>>>>>>> is hard-coded for the global consumer to "none" and using
> >>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf
> >>>>>>>>>>
> >>>
> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values
> >>> )
> >>>>>>>>>>
> >>>>>>>>>> Also, we could not even really deprecate the config as
> mentioned in
> >>>>>>>>>> rejected alternatives sections, because we need
> >>> `auto.offset.reset` for
> >>>>>>>>>> the main consumer -- and adding a prefix is independent of it.
> >>> Also,
> >>>>>>>>>> because we ignore the config, it's is also deprecated/removed if
> >>> you wish.
> >>>>>>>>>>
> >>>>>>>>>> I guess we could improve the code to actually log a warning for
> >>> this
> >>>>>>>>>> case, similar to what we do for some configs already (cf
> >>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> The other question is about compatibility with regard to default
> >>>>>>>>>> behavior: if we want to reintroduce
> >>> `global.consumer.auto.offset.reset`
> >>>>>>>>>> this basically implies that we need to respect
> >>> `auto.offset.reset`, too.
> >>>>>>>>>> Remember, that any config without prefix is applied to all
> clients
> >>> that
> >>>>>>>>>> support this config. Thus, if a user does not limit the scope of
> >>> the
> >>>>>>>>>> config to the main consumer (via
> >>> `main.consumer.auto.offset.reset`) but
> >>>>>>>>>> uses the non-prefix versions and sets it to "latest" (and relies
> >>> on the
> >>>>>>>>>> current behavior that `auto.offset.reset` is "none", and
> >>> effectively
> >>>>>>>>>> "earliest" on the global consumer), the user might end up with a
> >>>>>>>>>> surprise as the global consumer behavior would switch from
> >>> "earliest" to
> >>>>>>>>>> "latest" (most likely unintentionally). Bottom line is, that
> users
> >>> might
> >>>>>>>>>> need to change configs to preserve the old behavior...
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> However, before we discuss those details, I think we should
> >>> discuss the
> >>>>>>>>>> topic in a broader context first:
> >>>>>>>>>>
> >>>>>>>>>>   - for a GlobalKTable, does it even make sense from a
> correctness
> >>> point
> >>>>>>>>>> of view, to allow users to set a custom reset policy? It seems
> you
> >>>>>>>>>> currently don't propose this in the KIP, but as you don't
> mention
> >>> it
> >>>>>>>>>> explicitly it's unclear if that on purpose of an oversight?
> >>>>>>>>>>
> >>>>>>>>>>   - Should we treat global stores differently to GlobalKTables
> and
> >>> allow
> >>>>>>>>>> for more flexibility (as the PAPI does not really provide any
> >>> semantic
> >>>>>>>>>> contract). It seems that is what you propose in the KIP. We
> should
> >>>>>>>>>> discuss if this flexibility does make sense or not for the PAPI,
> >>> or if
> >>>>>>>>>> we should apply the same reasoning about correctness we use for
> >>> KTables
> >>>>>>>>>> to global stores? To what extend are/should they be different?
> >>>>>>>>>>
> >>>>>>>>>>   - If we support auto.offset.reset for global store, how
> should we
> >>>>>>>>>> handle the initial bootstrapping of the store/table (that is
> >>> hard-coded
> >>>>>>>>>> atm)? Should we skip it if the policy is "latest" and start
> with an
> >>>>>>>>>> empty state? Note that we did consider this behavior incorrect
> via
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am
> >>> wondering
> >>>>>>>>>> why should we change it back again?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Finally, the main motivation for the Jira ticket was to let the
> >>> runtime
> >>>>>>>>>> auto-recover instead of dying as it does currently. If we decide
> >>> that a
> >>>>>>>>>> custom reset policy does actually not make sense, we can just
> >>> change the
> >>>>>>>>>> global-thread to not die any longer on an
> `InvalidOffsetException`
> >>> but
> >>>>>>>>>> rebuild the state automatically. This would be "only" a behavior
> >>> change
> >>>>>>>>>> but does not require any public API changes. -- For this case,
> we
> >>> should
> >>>>>>>>>> also think about the synchronization with the main processing
> >>> threads?
> >>>>>>>>>> On startup we bootstrap the global stores before processing
> >>> happens.
> >>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global
> thread
> >>> dies,
> >>>>>>>>>> the main threads cannot access the global stores any longer an
> >>> also die.
> >>>>>>>>>> If we re-build the state though, do we need to pause the main
> >>> thread
> >>>>>>>>>> during this phase?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote:
> >>>>>>>>>>> Hi John,
> >>>>>>>>>>>
> >>>>>>>>>>> I have updated the KIP to make the motivation more clear. In a
> >>> nutshell, we will use the already existing config
> >>> "global.consumer.auto.offset.reset" for users to set a blanket reset
> policy
> >>> for all global topics and add a new interface to set per-topic reset
> policy
> >>> for each global topic(for which we specifically need this KIP). There
> was a
> >>> point raised from Matthias above to always reset to earliest by
> cleaning
> >>> the stores and seekToBeginning in case of InvalidOffsetException. We
> can go
> >>> with that route as well and I don't think it would need a KIP as if we
> are
> >>> not providing users an option to have blanket reset policy on global
> >>> topics, then a per-topic override would also not be required(the KIP is
> >>> required basically for that). Although, I think if users have an
> option to
> >>> choose reset policy for StreamThread then the option should be
> provided for
> >>> GlobalStreamThread as well and if we don't want to use the
> >>> "global.consumer.auto.offset.reset" then we would need to deprecate it
> >>> because currently it's not serving any purpose. For now, I have added
> it in
> >>> rejected alternatives but we can discuss this.
> >>>>>>>>>>>
> >>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias we
> have
> >>> fixed it last week as part of KAFKA-10306.
> >>>>>>>>>>>
> >>>>>>>>>>> ~Navinder
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar <
> >>> navinder_b...@yahoo.com.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Sorry, it took some time to respond back.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> “but I thought we would pass the config through to the client.”
> >>>>>>>>>>>
> >>>>>>>>>>>>> @John, sure we can use the config in GloablStreamThread, that
> >>> could be one of the way to solve it.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> @Matthias, sure cleaning the store and recreating is one way
> but
> >>> since we are giving an option to reset in StreamThread why the
> >>> implementation should be different in GlobalStreamThread. I think we
> should
> >>> use the global.consumer.auto.offset.reset config to accept the reset
> >>> strategy opted by the user although I would be ok with just cleaning
> and
> >>> resetting to the latest as well for now. Currently, we throw a
> >>> StreamsException in case of InvalidOffsetException in
> GlobalStreamThread so
> >>> just resetting would still be better than what happens currently.
> >>>>>>>>>>>
> >>>>>>>>>>> Matthias, I found this comment in StreamBuilder for
> GlobalKTable
> >>> ‘* Note that {@link GlobalKTable} always applies {@code
> >>> "auto.offset.reset"} strategy {@code "earliest"} regardless of the
> >>> specified value in {@link StreamsConfig} or {@link Consumed}.’
> >>>>>>>>>>> So, I guess we are already cleaning up and recreating for
> >>> GlobalKTable from earliest offset.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> @Guozhan while looking at the code, I also noticed a TODO:
> >>> pending in GlobalStateManagerImpl, when InvalidOffsetException is
> thrown.
> >>> Earlier, we were directly clearing the store here and recreating from
> >>> scratch but that code piece is removed now. Are you working on a
> follow-up
> >>> PR for this or just handling the reset in GlobalStreamThread should be
> >>> sufficient?
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Navinder
> >>>>>>>>>>>
> >>>>>>>>>>>     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax
> <
> >>> mj...@apache.org> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>   Atm, the config should be ignored and the global-consumer
> >>> should use
> >>>>>>>>>>> "none" in a hard-coded way.
> >>>>>>>>>>>
> >>>>>>>>>>> However, if am still wondering if we actually want/need to
> allow
> >>> users
> >>>>>>>>>>> to specify the reset policy? It might be worth to consider, to
> >>> just
> >>>>>>>>>>> change the behavior: catch the exception, log an ERROR (for
> >>> information
> >>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate the
> >>> store?
> >>>>>>>>>>>
> >>>>>>>>>>> Btw: if we want to allow users to set the reset policy, this
> >>> should be
> >>>>>>>>>>> possible via the config, or via overwriting the config in the
> >>> method
> >>>>>>>>>>> itself. Thus, we would need to add the new overloaded method to
> >>>>>>>>>>> `Topology` and `StreamsBuilder`.
> >>>>>>>>>>>
> >>>>>>>>>>> Another question to ask: what about GlobalKTables? Should they
> >>> behave
> >>>>>>>>>>> the same? An alternative design could be, to allow users to
> >>> specify a
> >>>>>>>>>>> flexible reset policy for global-stores, but not for
> >>> GlobalKTables and
> >>>>>>>>>>> use the strategy suggested above for this case.
> >>>>>>>>>>>
> >>>>>>>>>>> Thoughts?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote:
> >>>>>>>>>>>> Hi Navinder,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... You
> >>> said we are not currently using the config, but I thought we would
> pass the
> >>> config through to the client.  Can you confirm whether or not the
> existing
> >>> config works for your use case?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> John
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
> >>>>>>>>>>>>> Sorry my bad. Found it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer}
> configs
> >>> for the
> >>>>>>>>>>>>> global consumer client from
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> * the general consumer client configs. The override
> precedence
> >>> is the
> >>>>>>>>>>>>> following (from highest to lowest precedence):
> >>>>>>>>>>>>> * 1. global.consumer.[config-name]..
> >>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX =
> >>> "global.consumer.";
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> So, that's great. We already have a config exposed to reset
> >>> offsets for
> >>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset just that
> >>> we are
> >>>>>>>>>>>>> not actually using it inside GlobalStreamThread to reset.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Navinder
> >>>>>>>>>>>>>     On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar
> >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>   Hi John,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your feedback.
> >>>>>>>>>>>>> 1. I think there is some confusion on my first point, the
> enum
> >>> I am
> >>>>>>>>>>>>> sure we can use the same one but the external config which
> >>> controls the
> >>>>>>>>>>>>> resetting in global stream thread either we can the same one
> >>> which
> >>>>>>>>>>>>> users use for source topics(StreamThread) or we can provide a
> >>> new one
> >>>>>>>>>>>>> which specifically controls global topics. For e.g. currently
> >>> if I get
> >>>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I can
> >>> choose
> >>>>>>>>>>>>> whether to reset from Earliest or Latest(with
> >>> auto.offset.reset). Now
> >>>>>>>>>>>>> either we can use the same option and say if I get the same
> >>> exception
> >>>>>>>>>>>>> for global topics I will follow same resetting. Or some users
> >>> might
> >>>>>>>>>>>>> want to have totally different setting for both source and
> >>> global
> >>>>>>>>>>>>> topics, like for source topic I want resetting from Latest
> but
> >>> for
> >>>>>>>>>>>>> global topics I want resetting from Earliest so in that case
> >>> adding a
> >>>>>>>>>>>>> new config might be better.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. I couldn't find this config currently
> >>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in
> >>> GlobalStreamThread.java
> >>>>>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException
> >>> and there
> >>>>>>>>>>>>> is a test as
> >>>>>>>>>>>>> well
> >>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I
> >>>>>>>>>>>>> think this is the config we are trying to introduce with this
> >>> KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John
> >>> Roesler
> >>>>>>>>>>>>> <j...@vvcephei.org> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>   Hi Navinder,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for this proposal!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regarding your question about whether to use the same policy
> >>>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I think
> >>>>>>>>>>>>> we can just use the same AutoOffsetReset enum.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Can you confirm whether setting the reset policy config on
> the
> >>>>>>>>>>>>> global consumer currently works or not? Based on my reading
> >>>>>>>>>>>>> of StreamsConfig, it looks like it would be:
> >>>>>>>>>>>>> "global.consumer.auto.offset.reset".
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If that does work, would you still propose to augment the
> >>>>>>>>>>>>> Java API?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> -John
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> KIP:
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have taken over this KIP since it has been dormant for a
> >>> long time
> >>>>>>>>>>>>>> and this looks important for use-cases that have large
> global
> >>> data, so
> >>>>>>>>>>>>>> rebuilding global stores from scratch might seem overkill in
> >>> case of
> >>>>>>>>>>>>>> InvalidOffsetExecption.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We want to give users the control to use reset policy(as we
> do
> >>> in
> >>>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have still
> >>> not
> >>>>>>>>>>>>>> decided whether to restrict this option to the same reset
> >>> policy being
> >>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config) or add
> >>> another
> >>>>>>>>>>>>>> reset config specifically for global stores
> >>>>>>>>>>>>>> "global.auto.offset.reset" which gives users more control to
> >>> choose
> >>>>>>>>>>>>>> separate policies for global and stream threads.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would like to hear your opinions on the KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Navinder
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
> >
> >
>
>

Reply via email to