I synced with John in-person and he emphasized his concerns about
breaking code if we change the state machine. From an impl point of
view, I am concerned that maintaining two state machines at the same
time, might be very complex. John had the idea though, that we could
actually do an internal translation: Internally, we switch the state
machine to the new one, but translate new-stated to old-state before
doing the callback? (We only need two separate "state enums" and we add
a new method to register callbacks for the new state enums and deprecate
the existing method).

However, also with regard to the work Guozhang pointed out, I am
wondering if we should split out a independent KIP just for the state
machine changes? It seems complex enough be itself. We would hold-off
this KIP until the state machine change is done and resume it afterwards?

Thoughts?

-Matthias

On 10/6/20 8:55 PM, Guozhang Wang wrote:
> Sorry I'm late to the party.
> 
> Matthias raised a point to me regarding the recent development of moving
> restoration from stream threads to separate restore threads and allowing
> the stream threads to process any processible tasks even when some other
> tasks are still being restored by the restore threads:
> 
> https://issues.apache.org/jira/browse/KAFKA-10526
> https://issues.apache.org/jira/browse/KAFKA-10577
> 
> That would cause the restoration of non-global states to be more similar to
> global states such that some tasks would be processed even though the state
> of the stream thread is not yet in RUNNING (because today we only transit
> to it when ALL assigned tasks have completed restoration and are
> processible).
> 
> Also, as Sophie already mentioned, today during REBALANCING (in stream
> thread level, it is PARTITION_REVOKED -> PARTITION_ASSIGNED) some tasks may
> still be processed, and because of KIP-429 the RUNNING -> PARTITION_REVOKED
> -> PARTITION_ASSIGNED can be within a single call and hence be very
> "transient", whereas PARTITION_ASSIGNED -> RUNNING could still take time as
> it only do the transition when all tasks are processible.
> 
> So I think it makes sense to add a RESTORING state at the stream client
> level, defined as "at least one of the state stores assigned to this
> client, either global or non-global, is still restoring", and emphasize
> that during this state the client may still be able to process records,
> just probably not in full-speed.
> 
> As for REBALANCING, I think it is a bit less relevant to this KIP but
> here's a dump of my thoughts: if we can capture the period when "some tasks
> do not belong to any clients and hence processing is not full-speed" it
> would still be valuable, but unfortunately right now since
> onPartitionRevoked is not triggered each time on all clients, today's
> transition would just make a lot of very short REBALANCING state period
> which is not very useful really. So if we still want to keep that state
> maybe we can consider the following tweak: at the thread level, we replace
> PARTITION_REVOKED / PARTITION_ASSIGNED with just a single REBALANCING
> state, and we will transit to this state upon onPartitionRevoked, but we
> will only transit out of this state upon onAssignment when the assignor
> decides there's no follow-up rebalance immediately (note we also schedule
> future rebalances for workload balancing, but that would still trigger
> transiting out of it). On the client level, we would enter REBALANCING when
> any threads enter REBALANCING and we would transit out of it when all
> transits out of it. In this case, it is possible that during a rebalance,
> only those clients that have to revoke some partition would enter the
> REBALANCING state while others that only get additional tasks would not
> enter this state at all.
> 
> With all that being said, I think the discussion around REBALANCING is less
> relevant to this KIP, and even for RESTORING I honestly think maybe we can
> make it in another KIP out of 406. It will, admittedly leave us in a
> temporary phase where the FSM of Kafka Streams is not perfect, but that
> helps making incremental development progress for 406 itself.
> 
> 
> Guozhang
> 
> 
> On Mon, Oct 5, 2020 at 2:37 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> 
>> It seems a little misleading, but I actually have no real qualms about
>> transitioning to the
>> REBALANCING state *after* RESTORING. One of the side effects of KIP-429 was
>> that in
>> most cases we actually don't transition to REBALANCING at all until the
>> very end of the
>> rebalance, so REBALANCING doesn't really mean all that much any more. These
>> days
>> the majority of the time an instance spends in the REBALANCING state is
>> actually spent
>> on restoration anyways.
>>
>> If users are listening in on the REBALANCING -> RUNNING transition, then
>> they might
>> also be listening for the RUNNING -> REBALANCING transition, so we may need
>> to actually
>> go RUNNING -> REBALANCING -> RESTORING -> REBALANCING -> RUNNING. This
>> feels a bit unwieldy but I don't think there's anything specifically wrong
>> with it.
>>
>> That said, it makes me question the value of having a REBALANCING state at
>> all. In the
>> pre-KIP-429 days it made sense, because all tasks were paused and
>> unavailable for IQ
>> for the duration of the rebalance. But these days, the threads can continue
>> processing
>> any tasks they own during a rebalance, so the only time that tasks are
>> truly unavailable
>> is during the restoration phase.
>>
>> So, I find the idea of getting rid of the REBALANCING state altogether to
>> be pretty
>> appealing, in which case we'd probably need to introduce a new state
>> listener and
>> deprecate the current one as John proposed. I also wonder if this is the
>> sort of thing
>> we can just swallow as a breaking change in the upcoming 3.0 release.
>>
>> On Sat, Oct 3, 2020 at 11:02 PM Navinder Brar
>> <navinder_b...@yahoo.com.invalid> wrote:
>>
>>>
>>>
>>>
>>> Thanks a lot, Matthias for detailed feedback. I tend to agree with
>>> changing the state machine
>>>
>>> itself if required. I think at the end of the day InvalidOffsetException
>>> is a rare event and is not
>>>
>>> as frequent as rebalancing. So, pausing all tasks for once in while
>> should
>>> be ok from a processing
>>>
>>> standpoint.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I was also wondering if instead of adding RESTORING state b/w REBALANCING
>>> & RUNNING
>>>
>>> can we add it before REBALANCING. Whenever an application starts anyways
>>> there is no need for
>>>
>>> active/replica tasks to be present there for us to build global stores
>>> there. We can restore global stores first
>>>
>>> and then trigger a rebalancing to get the tasks assigned. This might help
>>> us in shielding the users
>>>
>>> from changing what they listen to currently(which is REBALANCING ->
>>> RUNNING). So, we go
>>>
>>> RESTORING -> REBALANCING -> RUNNING. The only drawback here might be that
>>> replicas would
>>>
>>> also be paused while we are restoring global stores but as Matthias said
>>> we would want to give
>>>
>>> complete bandwidth to restoring global stores in such a case and
>>> considering it is a rare event this
>>>
>>> should be ok. On the plus side, this would not lead to any race condition
>>> and we would not need to
>>>
>>> change the behavior of any stores. But this also means that this
>> RESTORING
>>> state is only for global stores
>>>
>>> like the GLOBAL_RESTORING state we discussed before :) as regular tasks
>>> still restore inside REBALANCING.
>>>
>>> @John, @Sophie do you think this would work?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>>
>>>
>>>
>>> Navinder
>>>
>>>
>>>
>>>
>>>     On Wednesday, 30 September, 2020, 09:39:07 pm IST, Matthias J. Sax <
>>> mj...@apache.org> wrote:
>>>
>>>  I guess we need to have some cleanup mechanism for this case anyway,
>>> because, the global thread can enter RESTORING state at any point in
>>> time, and thus, even if we set a flag to pause processing on the
>>> StreamThreads we are subject to a race condition.
>>>
>>> Beside that, on a high level I am fine with either "busy waiting" (ie,
>>> just lock the global-store and retry) or setting a flag. However, there
>>> are some trade-offs to consider:
>>>
>>> As we need a cleanup mechanism anyway, it might be ok to just use a
>>> single mechanism. -- We should consider the impact in EOS though, as we
>>> might need to wipe out the store of regular tasks for this case. Thus,
>>> setting a flag might actually help to prevent that we repeatably wipe
>>> the store on retries... On the other hand, we plan to avoid wiping the
>>> store in case of error for EOS anyway, and if we get this improvement,
>>> we might not need the flag.
>>>
>>> For the client state machine: I would actually prefer to have a
>>> RESTORING state and I would also prefer to pause _all_ tasks. This might
>>> imply that we want a flag. In the past, we allowed to interleave restore
>>> and processing in StreamThread (for regular tasks) what slowed down
>>> restoring and we changed it back to not process any tasks until all
>>> tasks are restored). Of course, in our case we have two different
>>> threads (not a single one). However, the network is still shared, so it
>>> might be desirable to give the full network bandwidth to the global
>>> consumer to restore as fast as possible (maybe an improvement we could
>>> add to `StreamThreads` too, if we have multiple threads)? And as a side
>>> effect, it does not muddy the waters what each client state means.
>>>
>>> Thus, overall, I tend to prefer a flag on `StreamThread` as it seems to
>>> provide a cleaner end-to-end solution (and we avoid the dependency to
>>> improve EOS state management).
>>>
>>> Btw: I am not sure if we actually need to preserve compatibility for the
>>> state machine? To me, it seems not to be a strict contract, and I would
>>> personally be ok to just change it.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 9/22/20 11:08 PM, Navinder Brar wrote:
>>>> Thanks a lot John for these suggestions. @Matthias can share your
>>> thoughts on the last two comments made in this chain.
>>>>
>>>> Thanks,Navinder
>>>>
>>>>    On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler <
>>> vvcep...@apache.org> wrote:
>>>>
>>>>  Hi Navinder,
>>>>
>>>> Thanks for the reply.
>>>>
>>>> I wasn't thinking of an _exponential_ backoff, but
>>>> otherwise, yes, that was the basic idea. Note, the mechanism
>>>> would be similar (if not the same) to what Matthias is
>>>> implementing for KIP-572:
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
>>>>
>>>> Regarding whether we'd stay in RUNNING during global
>>>> restoration or not, I can see your point. It seems like we
>>>> have three choices with how we set the state during global
>>>> restoration:
>>>> 1. stay in RUNNING: Users might get confused, since
>>>> processing could get stopped for some tasks. On the other
>>>> hand, processing for tasks not blocked by the global
>>>> restoration could proceed (if we adopt the other idea), so
>>>> maybe it still makes sense.
>>>> 2. transition to REBALANCING: Users might get confused,
>>>> since there is no actual rebalance. However, the current
>>>> state for Kafka Streams during state restoration is actually
>>>> REBALANCING, so it seems people already should understand
>>>> that REBALANCING really means REBALANCING|RESTORING. This
>>>> choice would preseve the existing state machine as well as
>>>> the existing meaning of all states
>>>> 3. add RESTORING: This could clarify the state machine, at
>>>> the expense of breaking compatibility. We could implement a
>>>> migration path by adding a new "state listener" interface
>>>> for the new state machine.
>>>>
>>>> It seems like option 3 results in the most sensible system,
>>>> but I'm not sure if it's worth the hassle. It certainly
>>>> seems orthogonal to the goal of this KIP. Option 2 is
>>>> probably the best practical choice.
>>>>
>>>>
>>>> Regarding _how_ the global state restoration could set a
>>>> flag preventing access to the store... This is indeed the
>>>> central challenge to this new idea. Just throwing out one
>>>> possibility: Once the global thread marks the store for
>>>> restoration, it would throw an exception, such as
>>>> "StoreIsRestoringException" on any access. The processor
>>>> would _not_ catch this exception. Instead, the StreamThread
>>>> would catch it, put this record/task on ice, and re-try it
>>>> later.
>>>>
>>>> That last mechanism is actually pretty complicated. For
>>>> example, what if the record is already partially processed
>>>> in the topology? We'd have to remember which ProcessorNode
>>>> to resume from when we re-try later.
>>>>
>>>> This is really where the spiritual overlap with KIP-572
>>>> comes in. Maybe Matthias can share some thoughts.
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Sun, 2020-09-13 at 07:50 +0000, Navinder Brar wrote:
>>>>>
>>>>> Hi John,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> If I understand this correctly, you are proposing to use exponential
>>> backoff
>>>>>
>>>>> in globalStore.get() to keep polling the global thread (whether it has
>>> restored
>>>>>
>>>>> completely or not) while the processor pauses the processing of a
>>> particular
>>>>>
>>>>> message which required querying on global store. That is stream
>> threads
>>>>>
>>>>> are still in RUNNING state but kind of paused till global thread
>>> restores and
>>>>>
>>>>> gives a go-ahead that complete state has been restored. I like the
>> idea
>>> for
>>>>> the first two reasons that you have mentioned but thinking from
>>> semanticspoint of view stream threads will be in RUNNING but still not
>>> processing events,
>>>>> will it be misleading for the users? Or you think we are doing it at
>>> enough
>>>>>
>>>>> places already and an exception should suffice.  As they will not
>>> understand
>>>>>
>>>>> why the stream thread is not processing and how much more time it will
>>> not
>>>>>
>>>>> process for. If the state explicitly stated RESTORING,
>>>>>
>>>>> users might have clearly understood that why it is happening.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Also, to achieve what we are discussing above, the store.get() on
>> which
>>> call is
>>>>>
>>>>> made has to understand whether it is a global store or not and if it
>> is
>>> a global store
>>>>>
>>>>> check whether it is restoring or not because both might be happening
>>>>>
>>>>> simultaneously with the above approach. With KIP-535 we have started
>>> serving
>>>>>
>>>>> normal stores in restoring state but those are just interactive
>> queries
>>> but here
>>>>>
>>>>> globalStore.get() might be called while processing which we don’t
>> want.
>>> So,
>>>>>
>>>>> restore for global store and get() might have to be exclusive. Is
>> there
>>> a way for a
>>>>>
>>>>> store to know if it global store or not because now internally global
>>> and normal
>>>>>
>>>>> stores will behave differently. Although if everyone is fine with the
>>> above approach
>>>>>
>>>>> we can discuss this in PR as well.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>> Navinder
>>>>>
>>>>>     On Saturday, 5 September, 2020, 02:09:07 am IST, John Roesler <
>>> vvcep...@apache.org> wrote:
>>>>>
>>>>>   Hi all,
>>>>>
>>>>> This conversation sounds good to me so far.
>>>>>
>>>>> Sophie raised a concern before that changing the state
>>>>> machine would break state restore listeners. This is true,
>>>>> and we actually have not changed the main state machine in a
>>>>> long time. The last change I remember was that we used to go
>>>>> "CREATED -> RUNNING -> REBALANCING -> RUNNING", and now we
>>>>> just go "CREATED -> REBALANCING -> RUNNING". This is
>>>>> actually the reason why many state listeners check for
>>>>> "REBALANCING -> RUNNING", to filter out the old "phantom
>>>>> running" transition from "CREATED -> RUNNING".
>>>>>
>>>>> Anyway, the observation is that dropping the "phantom
>>>>> running" state didn't break any real use case we were aware
>>>>> of. But adding RESTORING between REBALACING and RUNNING
>>>>> certainly would break the common pattern that we're aware
>>>>> of. This would indeed be the first time we introduce a
>>>>> practically breaking change to the state machine at least
>>>>> since 2.0, and maybe since 1.0 too. We should probably
>>>>> consider the impact.
>>>>>
>>>>> One alternative is to consider the state machine and the
>>>>> state listener to be coupled APIs. We can deprecate and
>>>>> replace the current state listener, and also introduce a new
>>>>> state machine enum with our desired new state and
>>>>> transitions, while leaving the existing one alone and
>>>>> deprecating it. Then, no existing code would break, only get
>>>>> deprecation warnings.
>>>>>
>>>>>
>>>>>
>>>>> Matthias gave me an idea a few messages back with his note
>>>>> about setting/checking "flags". What if we flip it around,
>>>>> and set the flags on the global stores themselves. Then, we
>>>>> throw an exception when a processor queries the store while
>>>>> it's restoring. When they get that exception, they just put
>>>>> that task on the back burner for a while and try again later
>>>>> (similar to Matthias's timeout handling KIP). The global
>>>>> thread sets the flag on a particular store when it realizes
>>>>> it needs to be re-created and unsets it when the restore
>>>>> completes.
>>>>>
>>>>> Then:
>>>>> 1. Only the global stores that actually need to be restored
>>>>> block anything
>>>>> 2. Only the tasks that access the stores get blocked
>>>>> 3. No new states need to be introduced
>>>>>
>>>>> WDYT?
>>>>> -John
>>>>>
>>>>> On Fri, 2020-09-04 at 13:18 +0000, Navinder Brar wrote:
>>>>>> Hi Sophie,
>>>>>>
>>>>>> Thanks for the detailed explanation. I agree from a user standpoint,
>> I
>>> don't think there is any use-case to take any separate action in case of
>>> GLOBAL_RESTORING and RESTORING phase.
>>>>>>
>>>>>> So, internally in the code we can handle the cases as
>>> Matthiasexplained above and we can discuss those in the PR. I will update
>>> the KIP based on what all we have converged towards including having an
>>> uber RESTORING(rather than GLOBAL_RESTORING)state which takes stream and
>>> global threads into consideration.
>>>>>>
>>>>>> I will update the KIP soon and share it again as a lot has changed
>>> from where we started this KIP from.
>>>>>>
>>>>>> Regards,Navinder
>>>>>>
>>>>>>     On Friday, 4 September, 2020, 04:19:20 am IST, Sophie
>> Blee-Goldman
>>> <sop...@confluent.io> wrote:
>>>>>>
>>>>>>   Thanks Matthias, that sounds like what I was thinking. I think we
>>> should
>>>>>> always be
>>>>>> able to figure out what to do in various scenarios as outlined in the
>>>>>> previous email.
>>>>>>
>>>>>>>   For the same reason, I wouldn't want to combine global restoring
>> and
>>>>>> normal restoring
>>>>>>> because then it would make all the restorings independent but we
>> don't
>>>>>> want that. We
>>>>>>> want global stores to be available before any processing starts on
>> the
>>>>>> active tasks.
>>>>>>
>>>>>> I'm not sure I follow this specific point, but I don't think I did a
>>> good
>>>>>> job of explaining my
>>>>>> proposal so it's probably my own fault. When I say that we should
>> merge
>>>>>> RESTORING
>>>>>> and GLOBAL_RESTORING, I just mean that we should provide a single
>>>>>> user-facing
>>>>>> state to encompass any ongoing restoration. The point of the
>>> KafkaStreams
>>>>>> RESTORING
>>>>>> state is to alert users that their state may be unavailable for IQ,
>> and
>>>>>> active tasks may be
>>>>>> idle. This is true for both global and non-global restoration. I
>> think
>>> the
>>>>>> ultimate question
>>>>>> is whether as a user, I would react any differently to a
>>> GLOBAL_RESTORING
>>>>>> state vs
>>>>>> the regular RESTORING. My take is "no", in which case we should just
>>>>>> provide a single
>>>>>> unified state for the minimal public API. But if anyone can think of
>> a
>>>>>> reason for the user
>>>>>> to need to distinguish between different types of restoration, that
>>> would
>>>>>> be a good
>>>>>> argument to keep them separate.
>>>>>>
>>>>>> Internally, we do need to keep track of a "global restore" flag to
>>>>>> determine the course
>>>>>> of action -- for example if a StreamThread transitions to RUNNING but
>>> sees
>>>>>> that the
>>>>>> KafkaStreams state is RESTORING, should it start processing or not?
>> The
>>>>>> answer
>>>>>> depends on whether the state is RESTORING due to any global stores.
>>> But the
>>>>>> KafkaStreams state is a public interface, not an internal bookkeeper,
>>> so we
>>>>>> shouldn't
>>>>>> try to push our internal logic into the user-facing API.
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>>>>
>>>>>>> I think this issue can actually be resolved.
>>>>>>>
>>>>>>>   - We need a flag on the stream-threads if global-restore is in
>>>>>>> progress; for this case, the stream-thread may go into RUNNING
>> state,
>>>>>>> but it's not allowed to actually process data -- it will be allowed
>> to
>>>>>>> update standby-task though.
>>>>>>>
>>>>>>>   - If a stream-thread restores, its own state is RESTORING and it
>>> does
>>>>>>> not need to care about the "global restore flag".
>>>>>>>
>>>>>>>   - The global-thread just does was we discussed, including using
>>> state
>>>>>>> RESTORING.
>>>>>>>
>>>>>>>   - The KafkaStreams client state is in RESTORING, if at least one
>>> thread
>>>>>>> (stream-thread or global-thread) is in state RESTORING.
>>>>>>>
>>>>>>>   - On startup, if there is a global-thread, the just set the
>>>>>>> global-restore flag upfront before we start the stream-threads (we
>> can
>>>>>>> actually still do the rebalance and potential restore in
>> stream-thread
>>>>>>> in parallel to global restore) and rely on the global-thread to
>> unset
>>>>>>> the flag.
>>>>>>>
>>>>>>>   - The tricky thing is, to "stop" processing in stream-threads if
>> we
>>>>>>> need to wipe the global-store and rebuilt it. For this, we should
>> set
>>>>>>> the "global restore flag" on the stream-threads, but we also need to
>>>>>>> "lock down" the global store in question and throw an exception if
>> the
>>>>>>> stream-thread tries to access it; if the stream-thread get this
>>>>>>> exception, it need to cleanup itself, and wait until the "global
>>> restore
>>>>>>> flag" is unset before it can continue.
>>>>>>>
>>>>>>>
>>>>>>> Do we think this would work? -- Of course, the devil is in the
>> details
>>>>>>> but it seems to become a PR discussion, and there is no reason to
>> make
>>>>>>> it part of the KIP.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 9/3/20 3:41 AM, Navinder Brar wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks, John, Matthias and Sophie for great feedback.
>>>>>>>>
>>>>>>>> On the point raised by Sophie that maybe we should allow normal
>>>>>>> restoring during GLOBAL_RESTORING, I think it makes sense but the
>>> challenge
>>>>>>> would be what happens when normal restoring(on actives) has finished
>>> but
>>>>>>> GLOBAL_RESTORINGis still going on. Currently, all restorings are
>>>>>>> independent of each other i.e. restoring happening on one
>> task/thread
>>>>>>> doesn't affect another. But if we do go ahead with allowing normal
>>>>>>> restoring during GLOBAL_RESTORING then we willstill have to pause
>> the
>>>>>>> active tasks from going to RUNNING if GLOBAL_RESTORING has not
>>> finished and
>>>>>>> normal restorings have finished. For the same reason, I wouldn't
>> want
>>> to
>>>>>>> combine global restoring and normal restoring because then it would
>>> make
>>>>>>> all the restorings independent but we don't want that. We want
>> global
>>>>>>> stores to be available before any processing starts on the active
>>> tasks.
>>>>>>>> Although I think restoring of replicas can still take place while
>>> global
>>>>>>> stores arerestoring because in replicas there is no danger of them
>>> starting
>>>>>>> processing.
>>>>>>>> Also, one point to bring up is that currently during application
>>> startup
>>>>>>> global stores restore first and then normal stream threads start.
>>>>>>>> Regards,Navinder
>>>>>>>>
>>>>>>>>     On Thursday, 3 September, 2020, 06:58:40 am IST, Matthias J.
>> Sax
>>> <
>>>>>>> mj...@apache.org> wrote:
>>>>>>>>   Thanks for the input Sophie. Those are all good points and I
>> fully
>>> agree
>>>>>>>> with them.
>>>>>>>>
>>>>>>>> When saying "pausing the processing threads" I only considered them
>>> in
>>>>>>>> `RUNNING` and thought we figure out the detail on the PR...
>> Excellent
>>>>>>> catch!
>>>>>>>> Changing state transitions is to some extend backward incompatible,
>>> but
>>>>>>>> I think (IIRC) we did it in the past and I personally tend to find
>> it
>>>>>>>> ok. That's why we cover those changes in a KIP.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote:
>>>>>>>>> If we're going to add a new GLOBAL_RESTORING state to the
>>> KafkaStreams
>>>>>>> FSM,
>>>>>>>>> maybe it would make sense to add a new plain RESTORING state that
>> we
>>>>>>>>> transition
>>>>>>>>> to when restoring non-global state stores following a rebalance.
>>> Right
>>>>>>> now
>>>>>>>>> all restoration
>>>>>>>>> occurs within the REBALANCING state, which is pretty misleading.
>>>>>>>>> Applications that
>>>>>>>>> have large amounts of state to restore will appear to be stuck
>>>>>>> rebalancing
>>>>>>>>> according to
>>>>>>>>> the state listener, when in fact the rebalance has completed long
>>> ago.
>>>>>>>>> Given that there
>>>>>>>>> are very much real scenarios where you actually *are *stuck
>>>>>>> rebalancing, it
>>>>>>>>> seems useful to
>>>>>>>>> distinguish plain restoration from more insidious cases that may
>>> require
>>>>>>>>> investigation and/or
>>>>>>>>> intervention.
>>>>>>>>>
>>>>>>>>> I don't mean to hijack this KIP, I just think it would be odd to
>>>>>>> introduce
>>>>>>>>> GLOBAL_RESTORING
>>>>>>>>> when there is no other kind of RESTORING state. One question this
>>> brings
>>>>>>>>> up, and I
>>>>>>>>> apologize if this has already been addressed, is what to do when
>> we
>>> are
>>>>>>>>> restoring
>>>>>>>>> both normal and global state stores? It sounds like we plan to
>>> pause the
>>>>>>>>> StreamThreads
>>>>>>>>> entirely, but there doesn't seem to be any reason not to allow
>>> regular
>>>>>>>>> state restoration -- or
>>>>>>>>> even standby processing -- while the global state is
>>> restoring.Given the
>>>>>>>>> current effort to move
>>>>>>>>> restoration & standbys to a separate thread, allowing them to
>>> continue
>>>>>>>>> while pausing
>>>>>>>>> only the StreamThread seems quite natural.
>>>>>>>>>
>>>>>>>>> Assuming that we actually do allow both types of restoration to
>>> occur at
>>>>>>>>> the same time,
>>>>>>>>> and if we did add a plain RESTORING state as well, which state
>>> should we
>>>>>>>>> end up in?
>>>>>>>>> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING
>>> state
>>>>>>> is to
>>>>>>>>> alert
>>>>>>>>> users of the non-progress of their active tasks. In both cases,
>> the
>>>>>>> active
>>>>>>>>> task is unable
>>>>>>>>> to continue until restoration has complete, so why distinguish
>>> between
>>>>>>> the
>>>>>>>>> two at all?
>>>>>>>>> Would it make sense to avoid a special GLOBAL_RESTORING state and
>>> just
>>>>>>>>> introduce
>>>>>>>>> a single unified RESTORING state to cover both the regular and
>>> global
>>>>>>> case?
>>>>>>>>> Just a thought
>>>>>>>>>
>>>>>>>>> My only concern is that this might be considered a breaking
>> change:
>>>>>>> users
>>>>>>>>> might be
>>>>>>>>> looking for the REBALANCING -> RUNNING transition specifically in
>>> order
>>>>>>> to
>>>>>>>>> alert when
>>>>>>>>> the application has started up, and we would no long go directly
>>> from
>>>>>>>>> REBALANCING to
>>>>>>>>>   RUNNING. I think we actually did/do this ourselves in a number
>> of
>>>>>>>>> integration tests and
>>>>>>>>> possibly in some examples. That said, it seems more appropriate to
>>> just
>>>>>>>>> listen for
>>>>>>>>> the RUNNING state rather than for a specific transition, and we
>>> should
>>>>>>>>> encourage users
>>>>>>>>> to do so rather than go out of our way to support transition-type
>>> state
>>>>>>>>> listeners.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Sophie
>>>>>>>>>
>>>>>>>>> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org>
>>>>>>> wrote:
>>>>>>>>>> I think this makes sense.
>>>>>>>>>>
>>>>>>>>>> When we introduce this new state, we might also tackle the jira a
>>>>>>>>>> mentioned. If there is a global thread, on startup of a
>>> `KafakStreams`
>>>>>>>>>> client we should not transit to `REBALANCING` but to the new
>>> state, and
>>>>>>>>>> maybe also make the "bootstrapping" non-blocking.
>>>>>>>>>>
>>>>>>>>>> I guess it's worth to mention this in the KIP.
>>>>>>>>>>
>>>>>>>>>> Btw: The new state for KafkaStreams should also be part of the
>> KIP
>>> as
>>>>>>> it
>>>>>>>>>> is a public API change, too.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 8/29/20 9:37 AM, John Roesler wrote:
>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the ping. Yes, that all sounds right to me. The name
>>>>>>>>>> “RESTORING_GLOBAL” sounds fine, too.
>>>>>>>>>>> I think as far as warnings go, we’d just propose to mention it
>> in
>>> the
>>>>>>>>>> javadoc of the relevant methods that the given topics should be
>>>>>>> compacted.
>>>>>>>>>>> Thanks!
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote:
>>>>>>>>>>>> Gentle ping.
>>>>>>>>>>>>
>>>>>>>>>>>> ~ Navinder
>>>>>>>>>>>>     On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder
>> Brar
>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks Matthias & John,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I am glad we are converging towards an understanding. So, to
>>>>>>> summarize,
>>>>>>>>>>>> we will still keep treating this change in KIP and instead of
>>>>>>> providing
>>>>>>>>>> a reset
>>>>>>>>>>>> strategy, we will cleanup, and reset to earliest and build the
>>> state.
>>>>>>>>>>>>
>>>>>>>>>>>> When we hit the exception and we are building the state, we
>> will
>>> stop
>>>>>>>>>> all
>>>>>>>>>>>> processing and change the state of KafkaStreams to something
>> like
>>>>>>>>>>>>
>>>>>>>>>>>> “RESTORING_GLOBAL” or the like.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> How do we plan to educate users on the non desired effects of
>>> using
>>>>>>>>>>>>
>>>>>>>>>>>> non-compacted global topics? (via the KIP itself?)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> +1 on changing the KTable behavior, reset policy for global,
>>>>>>> connecting
>>>>>>>>>>>> processors to global for a later stage when demanded.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Navinder
>>>>>>>>>>>>     On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J.
>>> Sax
>>>>>>>>>>>> <mj...@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>   Your observation is correct. Connecting (regular) stores to
>>>>>>> processors
>>>>>>>>>>>> is necessary to "merge" sub-topologies into single ones if a
>>> store is
>>>>>>>>>>>> shared. -- For global stores, the structure of the program does
>>> not
>>>>>>>>>>>> change and thus connecting srocessors to global stores is not
>>>>>>> required.
>>>>>>>>>>>> Also given our experience with restoring regular state stores
>>> (ie,
>>>>>>>>>>>> partial processing of task that don't need restore), it seems
>>> better
>>>>>>> to
>>>>>>>>>>>> pause processing and move all CPU and network resources to the
>>> global
>>>>>>>>>>>> thread to rebuild the global store as soon as possible instead
>> of
>>>>>>>>>>>> potentially slowing down the restore in order to make progress
>> on
>>>>>>> some
>>>>>>>>>>>> tasks.
>>>>>>>>>>>>
>>>>>>>>>>>> Of course, if we collect real world experience and it becomes
>> an
>>>>>>> issue,
>>>>>>>>>>>> we could still try to change it?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 8/18/20 3:31 PM, John Roesler wrote:
>>>>>>>>>>>>> Thanks Matthias,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sounds good. I'm on board with no public API change and just
>>>>>>>>>>>>> recovering instead of crashing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, to be clear, I wouldn't drag KTables into it; I was
>>>>>>>>>>>>> just trying to wrap my head around the congruity of our
>>>>>>>>>>>>> choice for GlobalKTable with respect to KTable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I agree that whatever we decide to do would probably also
>>>>>>>>>>>>> resolve KAFKA-7380.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Moving on to discuss the behavior change, I'm wondering if
>>>>>>>>>>>>> we really need to block all the StreamThreads. It seems like
>>>>>>>>>>>>> we only need to prevent processing on any task that's
>>>>>>>>>>>>> connected to the GlobalStore.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I just took a look at the topology building code, and it
>>>>>>>>>>>>> actually seems that connections to global stores don't need
>>>>>>>>>>>>> to be declared. That's a bummer, since it means that we
>>>>>>>>>>>>> really do have to stop all processing while the global
>>>>>>>>>>>>> thread catches up.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Changing this seems like it'd be out of scope right now, but
>>>>>>>>>>>>> I bring it up in case I'm wrong and it actually is possible
>>>>>>>>>>>>> to know which specific tasks need to be synchronized with
>>>>>>>>>>>>> which global state stores. If we could know that, then we'd
>>>>>>>>>>>>> only have to block some of the tasks, not all of the
>>>>>>>>>>>>> threads.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote:
>>>>>>>>>>>>>> Thanks for the discussion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree that this KIP is justified in any case -- even if we
>>> don't
>>>>>>>>>>>>>> change public API, as the change in behavior is significant.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A better documentation for cleanup policy is always good
>> (even
>>> if
>>>>>>> I am
>>>>>>>>>>>>>> not aware of any concrete complaints atm that users were not
>>> aware
>>>>>>> of
>>>>>>>>>>>>>> the implications). Of course, for a regular KTable, one can
>>>>>>>>>>>>>> enable/disable the source-topic-changelog optimization and
>>> thus can
>>>>>>>>>> use
>>>>>>>>>>>>>> a non-compacted topic for this case, what is quite a
>>> difference to
>>>>>>>>>>>>>> global stores/tables; so maybe it's worth to point out this
>>>>>>> difference
>>>>>>>>>>>>>> explicitly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As mentioned before, the main purpose of the original Jira
>> was
>>> to
>>>>>>>>>> avoid
>>>>>>>>>>>>>> the crash situation but to allow for auto-recovering while it
>>> was
>>>>>>> an
>>>>>>>>>>>>>> open question if it makes sense / would be useful to allow
>>> users to
>>>>>>>>>>>>>> specify a custom reset policy instead of using a hard-coded
>>>>>>> "earliest"
>>>>>>>>>>>>>> strategy. -- It seem it's still unclear if it would be useful
>>> and
>>>>>>> thus
>>>>>>>>>>>>>> it might be best to not add it for now -- we can still add it
>>>>>>> later if
>>>>>>>>>>>>>> there are concrete use-cases that need this feature.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> @John: I actually agree that it's also questionable to allow
>> a
>>>>>>> custom
>>>>>>>>>>>>>> reset policy for KTables... Not sure if we want to drag this
>>>>>>> question
>>>>>>>>>>>>>> into this KIP though?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So it seem, we all agree that we actually don't need any
>>> public API
>>>>>>>>>>>>>> changes, but we only want to avoid crashing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For this case, to preserve the current behavior that
>> guarantees
>>>>>>> that
>>>>>>>>>> the
>>>>>>>>>>>>>> global store/table is always loaded first, it seems we need
>> to
>>>>>>> have a
>>>>>>>>>>>>>> stop-the-world mechanism for the main `StreamThreads` for
>> this
>>>>>>> case --
>>>>>>>>>>>>>> do we need to add a new state to KafkaStreams client for this
>>> case?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Having a new state might also be helpful for
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 8/17/20 7:34 AM, John Roesler wrote:
>>>>>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I see what you mean about the global consumer being similar
>>>>>>>>>>>>>>> to the restore consumer.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I also agree that automatically performing the recovery
>>>>>>>>>>>>>>> steps should be strictly an improvement over the current
>>>>>>>>>>>>>>> situation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also, yes, it would be a good idea to make it clear that the
>>>>>>>>>>>>>>> global topic should be compacted in order to ensure correct
>>>>>>>>>>>>>>> semantics. It's the same way with input topics for KTables;
>>>>>>>>>>>>>>> we rely on users to ensure the topics are compacted, and if
>>>>>>>>>>>>>>> they aren't, then the execution semantics will be broken.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your inputs. Since, global topics are in a way
>>> their
>>>>>>> own
>>>>>>>>>> changelog, wouldn’t the global consumers be more akin to restore
>>>>>>> consumers
>>>>>>>>>> than the main consumer?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am also +1 on catching the exception and setting it to
>> the
>>>>>>>>>> earliest for now. Whenever an instance starts, currently global
>>> stream
>>>>>>>>>> thread(if available) goes to RUNNING before stream threads are
>>> started
>>>>>>> so
>>>>>>>>>> that means the global state is available when the processing by
>>> stream
>>>>>>>>>> threads start. So, with the new change of catching the exception,
>>>>>>> cleaning
>>>>>>>>>> store and resetting to earlier would probably be “stop the world”
>>> as
>>>>>>> you
>>>>>>>>>> said John, as I think we will have to pause the stream threads
>>> till the
>>>>>>>>>> whole global state is recovered. I assume it is "stop the world"
>>> right
>>>>>>> now
>>>>>>>>>> as well, since now also if an InvalidOffsetException comes, we
>>> throw
>>>>>>>>>> streams exception and the user has to clean up and handle all
>> this
>>>>>>> manually
>>>>>>>>>> and when that instance will start, it will restore global state
>>> first.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I had an additional thought to this whole problem, would it
>>> be
>>>>>>>>>> helpful to educate the users that global topics should have
>> cleanup
>>>>>>> policy
>>>>>>>>>> as compact, so that this invalid offset exception never arises
>> for
>>>>>>> them.
>>>>>>>>>> Assume for example, that the cleanup policy in global topic is
>>>>>>> "delete" and
>>>>>>>>>> it has deleted k1, k2 keys(via retention.ms) although all the
>>>>>>> instances
>>>>>>>>>> had already consumed them so they are in all global stores and
>> all
>>>>>>> other
>>>>>>>>>> instances are up to date on the global data(so no
>>>>>>> InvalidOffsetException).
>>>>>>>>>> Now, a new instance is added to the cluster, and we have already
>>> lost
>>>>>>> k1,
>>>>>>>>>> k2 from the global topic so it will start consuming from the
>>> earliest
>>>>>>> point
>>>>>>>>>> in the global topic. So, wouldn’t this global store on the new
>>>>>>> instance has
>>>>>>>>>> 2 keys less than all the other global stores already available in
>>> the
>>>>>>>>>> cluster? Please let me know if I am missing something. Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Navinder
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     On Friday, 14 August, 2020, 10:03:42 am IST, John
>>> Roesler <
>>>>>>>>>> vvcep...@apache.org> wrote:
>>>>>>>>>>>>>>>>   Hi all,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It seems like the main motivation for this proposal is
>>> satisfied
>>>>>>> if
>>>>>>>>>> we just implement some recovery mechanism instead of crashing. If
>>> the
>>>>>>>>>> mechanism is going to be pausing all the threads until the state
>> is
>>>>>>>>>> recovered, then it still seems like a big enough behavior change
>> to
>>>>>>> warrant
>>>>>>>>>> a KIP still.
>>>>>>>>>>>>>>>> I have to confess I’m a little unclear on why a custom
>> reset
>>>>>>> policy
>>>>>>>>>> for a global store, table, or even consumer might be considered
>>> wrong.
>>>>>>> It’s
>>>>>>>>>> clearly wrong for the restore consumer, but the global consumer
>>> seems
>>>>>>> more
>>>>>>>>>> semantically akin to the main consumer than the restore consumer.
>>>>>>>>>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from
>>>>>>> latest,
>>>>>>>>>> shouldn’t it also be wrong for a KTable, for exactly the same
>>> reason?
>>>>>>> It
>>>>>>>>>> certainly seems like it would be an odd choice, but I’ve seen
>> many
>>>>>>> choices
>>>>>>>>>> I thought were odd turn out to have perfectly reasonable use
>> cases.
>>>>>>>>>>>>>>>> As far as the PAPI global store goes, I could see adding
>> the
>>>>>>> option
>>>>>>>>>> to configure it, since as Matthias pointed out, there’s really no
>>>>>>> specific
>>>>>>>>>> semantics for the PAPI. But if automatic recovery is really all
>>>>>>> Navinder
>>>>>>>>>> wanted, the I could also see deferring this until someone
>>> specifically
>>>>>>>>>> wants it.
>>>>>>>>>>>>>>>> So the tl;dr is, if we just want to catch the exception and
>>>>>>> rebuild
>>>>>>>>>> the store by seeking to earliest with no config or API changes,
>>> then
>>>>>>> I’m +1.
>>>>>>>>>>>>>>>> I’m wondering if we can improve on the “stop the world”
>>> effect of
>>>>>>>>>> rebuilding the global store, though. It seems like we could put
>> our
>>>>>>> heads
>>>>>>>>>> together and come up with a more fine-grained approach to
>>> maintaining
>>>>>>> the
>>>>>>>>>> right semantics during recovery while still making some progress.
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> IMHO, now as you explained using
>>>>>>>>>> ‘global.consumer.auto.offset.reset’ is
>>>>>>>>>>>>>>>>> not as straightforward
>>>>>>>>>>>>>>>>> as it seems and it might change the existing behavior for
>>> users
>>>>>>>>>> without
>>>>>>>>>>>>>>>>> they releasing it, I also
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> think that we should change the behavior inside global
>>> stream
>>>>>>>>>> thread to
>>>>>>>>>>>>>>>>> not die on
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> InvalidOffsetException and instead clean and rebuild the
>>> state
>>>>>>>>>> from the
>>>>>>>>>>>>>>>>> earliest. On this, as you
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> mentioned that we would need to pause the stream threads
>>> till
>>>>>>> the
>>>>>>>>>>>>>>>>> global store is completely restored.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Without it, there will be incorrect processing results if
>>> they
>>>>>>> are
>>>>>>>>>>>>>>>>> utilizing a global store during processing.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So, basically we can divide the use-cases into 4 parts.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     - PAPI based global stores (will have the earliest
>>>>>>> hardcoded)
>>>>>>>>>>>>>>>>>     - PAPI based state stores (already has
>>> auto.reset.config)
>>>>>>>>>>>>>>>>>     - DSL based GlobalKTables (will have earliest
>> hardcoded)
>>>>>>>>>>>>>>>>>     - DSL based KTables (will continue with
>>> auto.reset.config)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So, this would mean that we are not changing any existing
>>>>>>>>>> behaviors
>>>>>>>>>>>>>>>>> with this if I am right.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I guess we could improve the code to actually log a
>> warning
>>> for
>>>>>>>>>> this
>>>>>>>>>>>>>>>>> case, similar to what we do for some configs already (cf
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I like this idea. In case we go ahead with the above
>>> approach
>>>>>>>>>> and if we can’t
>>>>>>>>>>>>>>>>> deprecate it, we should educate users that this config
>>> doesn’t
>>>>>>>>>> work.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Looking forward to hearing thoughts from others as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST,
>>>>>>>>>> Matthias J.
>>>>>>>>>>>>>>>>> Sax <mj...@apache.org> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   Navinder,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> thanks for updating the KIP. I think the motivation
>> section
>>> is
>>>>>>> not
>>>>>>>>>>>>>>>>> totally accurate (what is not your fault though, as the
>>> history
>>>>>>> of
>>>>>>>>>> how
>>>>>>>>>>>>>>>>> we handle this case is intertwined...) For example,
>>>>>>>>>> "auto.offset.reset"
>>>>>>>>>>>>>>>>> is hard-coded for the global consumer to "none" and using
>>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf
>>>>>>>>>>>>>>>>>
>>>>>>>
>>>
>> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values
>>>>>>>>>> )
>>>>>>>>>>>>>>>>> Also, we could not even really deprecate the config as
>>>>>>> mentioned in
>>>>>>>>>>>>>>>>> rejected alternatives sections, because we need
>>>>>>>>>> `auto.offset.reset` for
>>>>>>>>>>>>>>>>> the main consumer -- and adding a prefix is independent of
>>> it.
>>>>>>>>>> Also,
>>>>>>>>>>>>>>>>> because we ignore the config, it's is also
>>> deprecated/removed if
>>>>>>>>>> you wish.
>>>>>>>>>>>>>>>>> I guess we could improve the code to actually log a
>> warning
>>> for
>>>>>>>>>> this
>>>>>>>>>>>>>>>>> case, similar to what we do for some configs already (cf
>>>>>>>>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The other question is about compatibility with regard to
>>> default
>>>>>>>>>>>>>>>>> behavior: if we want to reintroduce
>>>>>>>>>> `global.consumer.auto.offset.reset`
>>>>>>>>>>>>>>>>> this basically implies that we need to respect
>>>>>>>>>> `auto.offset.reset`, too.
>>>>>>>>>>>>>>>>> Remember, that any config without prefix is applied to all
>>>>>>> clients
>>>>>>>>>> that
>>>>>>>>>>>>>>>>> support this config. Thus, if a user does not limit the
>>> scope of
>>>>>>>>>> the
>>>>>>>>>>>>>>>>> config to the main consumer (via
>>>>>>>>>> `main.consumer.auto.offset.reset`) but
>>>>>>>>>>>>>>>>> uses the non-prefix versions and sets it to "latest" (and
>>> relies
>>>>>>>>>> on the
>>>>>>>>>>>>>>>>> current behavior that `auto.offset.reset` is "none", and
>>>>>>>>>> effectively
>>>>>>>>>>>>>>>>> "earliest" on the global consumer), the user might end up
>>> with a
>>>>>>>>>>>>>>>>> surprise as the global consumer behavior would switch from
>>>>>>>>>> "earliest" to
>>>>>>>>>>>>>>>>> "latest" (most likely unintentionally). Bottom line is,
>> that
>>>>>>> users
>>>>>>>>>> might
>>>>>>>>>>>>>>>>> need to change configs to preserve the old behavior...
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, before we discuss those details, I think we
>> should
>>>>>>>>>> discuss the
>>>>>>>>>>>>>>>>> topic in a broader context first:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   - for a GlobalKTable, does it even make sense from a
>>>>>>> correctness
>>>>>>>>>> point
>>>>>>>>>>>>>>>>> of view, to allow users to set a custom reset policy? It
>>> seems
>>>>>>> you
>>>>>>>>>>>>>>>>> currently don't propose this in the KIP, but as you don't
>>>>>>> mention
>>>>>>>>>> it
>>>>>>>>>>>>>>>>> explicitly it's unclear if that on purpose of an
>> oversight?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   - Should we treat global stores differently to
>>> GlobalKTables
>>>>>>> and
>>>>>>>>>> allow
>>>>>>>>>>>>>>>>> for more flexibility (as the PAPI does not really provide
>>> any
>>>>>>>>>> semantic
>>>>>>>>>>>>>>>>> contract). It seems that is what you propose in the KIP.
>> We
>>>>>>> should
>>>>>>>>>>>>>>>>> discuss if this flexibility does make sense or not for the
>>> PAPI,
>>>>>>>>>> or if
>>>>>>>>>>>>>>>>> we should apply the same reasoning about correctness we
>> use
>>> for
>>>>>>>>>> KTables
>>>>>>>>>>>>>>>>> to global stores? To what extend are/should they be
>>> different?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   - If we support auto.offset.reset for global store, how
>>>>>>> should we
>>>>>>>>>>>>>>>>> handle the initial bootstrapping of the store/table (that
>> is
>>>>>>>>>> hard-coded
>>>>>>>>>>>>>>>>> atm)? Should we skip it if the policy is "latest" and
>> start
>>>>>>> with an
>>>>>>>>>>>>>>>>> empty state? Note that we did consider this behavior
>>> incorrect
>>>>>>> via
>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus
>>> I am
>>>>>>>>>> wondering
>>>>>>>>>>>>>>>>> why should we change it back again?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Finally, the main motivation for the Jira ticket was to
>> let
>>> the
>>>>>>>>>> runtime
>>>>>>>>>>>>>>>>> auto-recover instead of dying as it does currently. If we
>>> decide
>>>>>>>>>> that a
>>>>>>>>>>>>>>>>> custom reset policy does actually not make sense, we can
>>> just
>>>>>>>>>> change the
>>>>>>>>>>>>>>>>> global-thread to not die any longer on an
>>>>>>> `InvalidOffsetException`
>>>>>>>>>> but
>>>>>>>>>>>>>>>>> rebuild the state automatically. This would be "only" a
>>> behavior
>>>>>>>>>> change
>>>>>>>>>>>>>>>>> but does not require any public API changes. -- For this
>>> case,
>>>>>>> we
>>>>>>>>>> should
>>>>>>>>>>>>>>>>> also think about the synchronization with the main
>>> processing
>>>>>>>>>> threads?
>>>>>>>>>>>>>>>>> On startup we bootstrap the global stores before
>> processing
>>>>>>>>>> happens.
>>>>>>>>>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global
>>>>>>> thread
>>>>>>>>>> dies,
>>>>>>>>>>>>>>>>> the main threads cannot access the global stores any
>> longer
>>> an
>>>>>>>>>> also die.
>>>>>>>>>>>>>>>>> If we re-build the state though, do we need to pause the
>>> main
>>>>>>>>>> thread
>>>>>>>>>>>>>>>>> during this phase?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote:
>>>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have updated the KIP to make the motivation more clear.
>>> In a
>>>>>>>>>> nutshell, we will use the already existing config
>>>>>>>>>> "global.consumer.auto.offset.reset" for users to set a blanket
>>> reset
>>>>>>> policy
>>>>>>>>>> for all global topics and add a new interface to set per-topic
>>> reset
>>>>>>> policy
>>>>>>>>>> for each global topic(for which we specifically need this KIP).
>>> There
>>>>>>> was a
>>>>>>>>>> point raised from Matthias above to always reset to earliest by
>>>>>>> cleaning
>>>>>>>>>> the stores and seekToBeginning in case of InvalidOffsetException.
>>> We
>>>>>>> can go
>>>>>>>>>> with that route as well and I don't think it would need a KIP as
>>> if we
>>>>>>> are
>>>>>>>>>> not providing users an option to have blanket reset policy on
>>> global
>>>>>>>>>> topics, then a per-topic override would also not be required(the
>>> KIP is
>>>>>>>>>> required basically for that). Although, I think if users have an
>>>>>>> option to
>>>>>>>>>> choose reset policy for StreamThread then the option should be
>>>>>>> provided for
>>>>>>>>>> GlobalStreamThread as well and if we don't want to use the
>>>>>>>>>> "global.consumer.auto.offset.reset" then we would need to
>>> deprecate it
>>>>>>>>>> because currently it's not serving any purpose. For now, I have
>>> added
>>>>>>> it in
>>>>>>>>>> rejected alternatives but we can discuss this.
>>>>>>>>>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias
>> we
>>>>>>> have
>>>>>>>>>> fixed it last week as part of KAFKA-10306.
>>>>>>>>>>>>>>>>>> ~Navinder
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder
>>> Brar <
>>>>>>>>>> navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Sorry, it took some time to respond back.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> “but I thought we would pass the config through to the
>>> client.”
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @John, sure we can use the config in
>> GloablStreamThread,
>>> that
>>>>>>>>>> could be one of the way to solve it.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> @Matthias, sure cleaning the store and recreating is one
>>> way
>>>>>>> but
>>>>>>>>>> since we are giving an option to reset in StreamThread why the
>>>>>>>>>> implementation should be different in GlobalStreamThread. I think
>>> we
>>>>>>> should
>>>>>>>>>> use the global.consumer.auto.offset.reset config to accept the
>>> reset
>>>>>>>>>> strategy opted by the user although I would be ok with just
>>> cleaning
>>>>>>> and
>>>>>>>>>> resetting to the latest as well for now. Currently, we throw a
>>>>>>>>>> StreamsException in case of InvalidOffsetException in
>>>>>>> GlobalStreamThread so
>>>>>>>>>> just resetting would still be better than what happens currently.
>>>>>>>>>>>>>>>>>> Matthias, I found this comment in StreamBuilder for
>>>>>>> GlobalKTable
>>>>>>>>>> ‘* Note that {@link GlobalKTable} always applies {@code
>>>>>>>>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of
>> the
>>>>>>>>>> specified value in {@link StreamsConfig} or {@link Consumed}.’
>>>>>>>>>>>>>>>>>> So, I guess we are already cleaning up and recreating for
>>>>>>>>>> GlobalKTable from earliest offset.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> @Guozhan while looking at the code, I also noticed a
>> TODO:
>>>>>>>>>> pending in GlobalStateManagerImpl, when InvalidOffsetException is
>>>>>>> thrown.
>>>>>>>>>> Earlier, we were directly clearing the store here and recreating
>>> from
>>>>>>>>>> scratch but that code piece is removed now. Are you working on a
>>>>>>> follow-up
>>>>>>>>>> PR for this or just handling the reset in GlobalStreamThread
>>> should be
>>>>>>>>>> sufficient?
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Navinder
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias
>> J.
>>> Sax
>>>>>>> <
>>>>>>>>>> mj...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>   Atm, the config should be ignored and the
>> global-consumer
>>>>>>>>>> should use
>>>>>>>>>>>>>>>>>> "none" in a hard-coded way.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> However, if am still wondering if we actually want/need
>> to
>>>>>>> allow
>>>>>>>>>> users
>>>>>>>>>>>>>>>>>> to specify the reset policy? It might be worth to
>>> consider, to
>>>>>>>>>> just
>>>>>>>>>>>>>>>>>> change the behavior: catch the exception, log an ERROR
>> (for
>>>>>>>>>> information
>>>>>>>>>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate
>>> the
>>>>>>>>>> store?
>>>>>>>>>>>>>>>>>> Btw: if we want to allow users to set the reset policy,
>>> this
>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>> possible via the config, or via overwriting the config in
>>> the
>>>>>>>>>> method
>>>>>>>>>>>>>>>>>> itself. Thus, we would need to add the new overloaded
>>> method to
>>>>>>>>>>>>>>>>>> `Topology` and `StreamsBuilder`.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Another question to ask: what about GlobalKTables? Should
>>> they
>>>>>>>>>> behave
>>>>>>>>>>>>>>>>>> the same? An alternative design could be, to allow users
>> to
>>>>>>>>>> specify a
>>>>>>>>>>>>>>>>>> flexible reset policy for global-stores, but not for
>>>>>>>>>> GlobalKTables and
>>>>>>>>>>>>>>>>>> use the strategy suggested above for this case.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote:
>>>>>>>>>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense...
>>> You
>>>>>>>>>> said we are not currently using the config, but I thought we
>> would
>>>>>>> pass the
>>>>>>>>>> config through to the client.  Can you confirm whether or not the
>>>>>>> existing
>>>>>>>>>> config works for your use case?
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
>>>>>>>>>>>>>>>>>>>> Sorry my bad. Found it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer}
>>>>>>> configs
>>>>>>>>>> for the
>>>>>>>>>>>>>>>>>>>> global consumer client from
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> * the general consumer client configs. The override
>>>>>>> precedence
>>>>>>>>>> is the
>>>>>>>>>>>>>>>>>>>> following (from highest to lowest precedence):
>>>>>>>>>>>>>>>>>>>> * 1. global.consumer.[config-name]..
>>>>>>>>>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX =
>>>>>>>>>> "global.consumer.";
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So, that's great. We already have a config exposed to
>>> reset
>>>>>>>>>> offsets for
>>>>>>>>>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset
>> just
>>> that
>>>>>>>>>> we are
>>>>>>>>>>>>>>>>>>>> not actually using it inside GlobalStreamThread to
>> reset.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> -Navinder
>>>>>>>>>>>>>>>>>>>>     On Monday, 29 June, 2020, 12:24:21 am IST, Navinder
>>> Brar
>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>   Hi John,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for your feedback.
>>>>>>>>>>>>>>>>>>>> 1. I think there is some confusion on my first point,
>> the
>>>>>>> enum
>>>>>>>>>> I am
>>>>>>>>>>>>>>>>>>>> sure we can use the same one but the external config
>>> which
>>>>>>>>>> controls the
>>>>>>>>>>>>>>>>>>>> resetting in global stream thread either we can the
>> same
>>> one
>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> users use for source topics(StreamThread) or we can
>>> provide a
>>>>>>>>>> new one
>>>>>>>>>>>>>>>>>>>> which specifically controls global topics. For e.g.
>>> currently
>>>>>>>>>> if I get
>>>>>>>>>>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I
>>> can
>>>>>>>>>> choose
>>>>>>>>>>>>>>>>>>>> whether to reset from Earliest or Latest(with
>>>>>>>>>> auto.offset.reset). Now
>>>>>>>>>>>>>>>>>>>> either we can use the same option and say if I get the
>>> same
>>>>>>>>>> exception
>>>>>>>>>>>>>>>>>>>> for global topics I will follow same resetting. Or some
>>> users
>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>> want to have totally different setting for both source
>>> and
>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>> topics, like for source topic I want resetting from
>>> Latest
>>>>>>> but
>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> global topics I want resetting from Earliest so in that
>>> case
>>>>>>>>>> adding a
>>>>>>>>>>>>>>>>>>>> new config might be better.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2. I couldn't find this config currently
>>>>>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in
>>>>>>>>>> GlobalStreamThread.java
>>>>>>>>>>>>>>>>>>>> we are throwing a StreamsException for
>>> InvalidOffsetException
>>>>>>>>>> and there
>>>>>>>>>>>>>>>>>>>> is a test as
>>>>>>>>>>>>>>>>>>>> well
>>>>>>>>>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I
>>>>>>>>>>>>>>>>>>>> think this is the config we are trying to introduce
>> with
>>> this
>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>> -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST,
>>> John
>>>>>>>>>> Roesler
>>>>>>>>>>>>>>>>>>>> <j...@vvcephei.org> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>   Hi Navinder,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for this proposal!
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regarding your question about whether to use the same
>>> policy
>>>>>>>>>>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I
>>> think
>>>>>>>>>>>>>>>>>>>> we can just use the same AutoOffsetReset enum.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Can you confirm whether setting the reset policy config
>>> on
>>>>>>> the
>>>>>>>>>>>>>>>>>>>> global consumer currently works or not? Based on my
>>> reading
>>>>>>>>>>>>>>>>>>>> of StreamsConfig, it looks like it would be:
>>>>>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If that does work, would you still propose to augment
>> the
>>>>>>>>>>>>>>>>>>>> Java API?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> KIP:
>>>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
>>>>>>>>>>>>>>>>>>>>> I have taken over this KIP since it has been dormant
>>> for a
>>>>>>>>>> long time
>>>>>>>>>>>>>>>>>>>>> and this looks important for use-cases that have large
>>>>>>> global
>>>>>>>>>> data, so
>>>>>>>>>>>>>>>>>>>>> rebuilding global stores from scratch might seem
>>> overkill in
>>>>>>>>>> case of
>>>>>>>>>>>>>>>>>>>>> InvalidOffsetExecption.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We want to give users the control to use reset
>>> policy(as we
>>>>>>> do
>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have
>>> still
>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>> decided whether to restrict this option to the same
>>> reset
>>>>>>>>>> policy being
>>>>>>>>>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config)
>> or
>>> add
>>>>>>>>>> another
>>>>>>>>>>>>>>>>>>>>> reset config specifically for global stores
>>>>>>>>>>>>>>>>>>>>> "global.auto.offset.reset" which gives users more
>>> control to
>>>>>>>>>> choose
>>>>>>>>>>>>>>>>>>>>> separate policies for global and stream threads.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I would like to hear your opinions on the KIP.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Navinder
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>
> 
> 

Reply via email to