I synced with John in-person and he emphasized his concerns about breaking code if we change the state machine. From an impl point of view, I am concerned that maintaining two state machines at the same time, might be very complex. John had the idea though, that we could actually do an internal translation: Internally, we switch the state machine to the new one, but translate new-stated to old-state before doing the callback? (We only need two separate "state enums" and we add a new method to register callbacks for the new state enums and deprecate the existing method).
However, also with regard to the work Guozhang pointed out, I am wondering if we should split out a independent KIP just for the state machine changes? It seems complex enough be itself. We would hold-off this KIP until the state machine change is done and resume it afterwards? Thoughts? -Matthias On 10/6/20 8:55 PM, Guozhang Wang wrote: > Sorry I'm late to the party. > > Matthias raised a point to me regarding the recent development of moving > restoration from stream threads to separate restore threads and allowing > the stream threads to process any processible tasks even when some other > tasks are still being restored by the restore threads: > > https://issues.apache.org/jira/browse/KAFKA-10526 > https://issues.apache.org/jira/browse/KAFKA-10577 > > That would cause the restoration of non-global states to be more similar to > global states such that some tasks would be processed even though the state > of the stream thread is not yet in RUNNING (because today we only transit > to it when ALL assigned tasks have completed restoration and are > processible). > > Also, as Sophie already mentioned, today during REBALANCING (in stream > thread level, it is PARTITION_REVOKED -> PARTITION_ASSIGNED) some tasks may > still be processed, and because of KIP-429 the RUNNING -> PARTITION_REVOKED > -> PARTITION_ASSIGNED can be within a single call and hence be very > "transient", whereas PARTITION_ASSIGNED -> RUNNING could still take time as > it only do the transition when all tasks are processible. > > So I think it makes sense to add a RESTORING state at the stream client > level, defined as "at least one of the state stores assigned to this > client, either global or non-global, is still restoring", and emphasize > that during this state the client may still be able to process records, > just probably not in full-speed. > > As for REBALANCING, I think it is a bit less relevant to this KIP but > here's a dump of my thoughts: if we can capture the period when "some tasks > do not belong to any clients and hence processing is not full-speed" it > would still be valuable, but unfortunately right now since > onPartitionRevoked is not triggered each time on all clients, today's > transition would just make a lot of very short REBALANCING state period > which is not very useful really. So if we still want to keep that state > maybe we can consider the following tweak: at the thread level, we replace > PARTITION_REVOKED / PARTITION_ASSIGNED with just a single REBALANCING > state, and we will transit to this state upon onPartitionRevoked, but we > will only transit out of this state upon onAssignment when the assignor > decides there's no follow-up rebalance immediately (note we also schedule > future rebalances for workload balancing, but that would still trigger > transiting out of it). On the client level, we would enter REBALANCING when > any threads enter REBALANCING and we would transit out of it when all > transits out of it. In this case, it is possible that during a rebalance, > only those clients that have to revoke some partition would enter the > REBALANCING state while others that only get additional tasks would not > enter this state at all. > > With all that being said, I think the discussion around REBALANCING is less > relevant to this KIP, and even for RESTORING I honestly think maybe we can > make it in another KIP out of 406. It will, admittedly leave us in a > temporary phase where the FSM of Kafka Streams is not perfect, but that > helps making incremental development progress for 406 itself. > > > Guozhang > > > On Mon, Oct 5, 2020 at 2:37 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > >> It seems a little misleading, but I actually have no real qualms about >> transitioning to the >> REBALANCING state *after* RESTORING. One of the side effects of KIP-429 was >> that in >> most cases we actually don't transition to REBALANCING at all until the >> very end of the >> rebalance, so REBALANCING doesn't really mean all that much any more. These >> days >> the majority of the time an instance spends in the REBALANCING state is >> actually spent >> on restoration anyways. >> >> If users are listening in on the REBALANCING -> RUNNING transition, then >> they might >> also be listening for the RUNNING -> REBALANCING transition, so we may need >> to actually >> go RUNNING -> REBALANCING -> RESTORING -> REBALANCING -> RUNNING. This >> feels a bit unwieldy but I don't think there's anything specifically wrong >> with it. >> >> That said, it makes me question the value of having a REBALANCING state at >> all. In the >> pre-KIP-429 days it made sense, because all tasks were paused and >> unavailable for IQ >> for the duration of the rebalance. But these days, the threads can continue >> processing >> any tasks they own during a rebalance, so the only time that tasks are >> truly unavailable >> is during the restoration phase. >> >> So, I find the idea of getting rid of the REBALANCING state altogether to >> be pretty >> appealing, in which case we'd probably need to introduce a new state >> listener and >> deprecate the current one as John proposed. I also wonder if this is the >> sort of thing >> we can just swallow as a breaking change in the upcoming 3.0 release. >> >> On Sat, Oct 3, 2020 at 11:02 PM Navinder Brar >> <navinder_b...@yahoo.com.invalid> wrote: >> >>> >>> >>> >>> Thanks a lot, Matthias for detailed feedback. I tend to agree with >>> changing the state machine >>> >>> itself if required. I think at the end of the day InvalidOffsetException >>> is a rare event and is not >>> >>> as frequent as rebalancing. So, pausing all tasks for once in while >> should >>> be ok from a processing >>> >>> standpoint. >>> >>> >>> >>> >>> >>> >>> >>> I was also wondering if instead of adding RESTORING state b/w REBALANCING >>> & RUNNING >>> >>> can we add it before REBALANCING. Whenever an application starts anyways >>> there is no need for >>> >>> active/replica tasks to be present there for us to build global stores >>> there. We can restore global stores first >>> >>> and then trigger a rebalancing to get the tasks assigned. This might help >>> us in shielding the users >>> >>> from changing what they listen to currently(which is REBALANCING -> >>> RUNNING). So, we go >>> >>> RESTORING -> REBALANCING -> RUNNING. The only drawback here might be that >>> replicas would >>> >>> also be paused while we are restoring global stores but as Matthias said >>> we would want to give >>> >>> complete bandwidth to restoring global stores in such a case and >>> considering it is a rare event this >>> >>> should be ok. On the plus side, this would not lead to any race condition >>> and we would not need to >>> >>> change the behavior of any stores. But this also means that this >> RESTORING >>> state is only for global stores >>> >>> like the GLOBAL_RESTORING state we discussed before :) as regular tasks >>> still restore inside REBALANCING. >>> >>> @John, @Sophie do you think this would work? >>> >>> >>> >>> >>> >>> >>> >>> Regards, >>> >>> >>> >>> >>> Navinder >>> >>> >>> >>> >>> On Wednesday, 30 September, 2020, 09:39:07 pm IST, Matthias J. Sax < >>> mj...@apache.org> wrote: >>> >>> I guess we need to have some cleanup mechanism for this case anyway, >>> because, the global thread can enter RESTORING state at any point in >>> time, and thus, even if we set a flag to pause processing on the >>> StreamThreads we are subject to a race condition. >>> >>> Beside that, on a high level I am fine with either "busy waiting" (ie, >>> just lock the global-store and retry) or setting a flag. However, there >>> are some trade-offs to consider: >>> >>> As we need a cleanup mechanism anyway, it might be ok to just use a >>> single mechanism. -- We should consider the impact in EOS though, as we >>> might need to wipe out the store of regular tasks for this case. Thus, >>> setting a flag might actually help to prevent that we repeatably wipe >>> the store on retries... On the other hand, we plan to avoid wiping the >>> store in case of error for EOS anyway, and if we get this improvement, >>> we might not need the flag. >>> >>> For the client state machine: I would actually prefer to have a >>> RESTORING state and I would also prefer to pause _all_ tasks. This might >>> imply that we want a flag. In the past, we allowed to interleave restore >>> and processing in StreamThread (for regular tasks) what slowed down >>> restoring and we changed it back to not process any tasks until all >>> tasks are restored). Of course, in our case we have two different >>> threads (not a single one). However, the network is still shared, so it >>> might be desirable to give the full network bandwidth to the global >>> consumer to restore as fast as possible (maybe an improvement we could >>> add to `StreamThreads` too, if we have multiple threads)? And as a side >>> effect, it does not muddy the waters what each client state means. >>> >>> Thus, overall, I tend to prefer a flag on `StreamThread` as it seems to >>> provide a cleaner end-to-end solution (and we avoid the dependency to >>> improve EOS state management). >>> >>> Btw: I am not sure if we actually need to preserve compatibility for the >>> state machine? To me, it seems not to be a strict contract, and I would >>> personally be ok to just change it. >>> >>> >>> -Matthias >>> >>> >>> On 9/22/20 11:08 PM, Navinder Brar wrote: >>>> Thanks a lot John for these suggestions. @Matthias can share your >>> thoughts on the last two comments made in this chain. >>>> >>>> Thanks,Navinder >>>> >>>> On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler < >>> vvcep...@apache.org> wrote: >>>> >>>> Hi Navinder, >>>> >>>> Thanks for the reply. >>>> >>>> I wasn't thinking of an _exponential_ backoff, but >>>> otherwise, yes, that was the basic idea. Note, the mechanism >>>> would be similar (if not the same) to what Matthias is >>>> implementing for KIP-572: >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams >>>> >>>> Regarding whether we'd stay in RUNNING during global >>>> restoration or not, I can see your point. It seems like we >>>> have three choices with how we set the state during global >>>> restoration: >>>> 1. stay in RUNNING: Users might get confused, since >>>> processing could get stopped for some tasks. On the other >>>> hand, processing for tasks not blocked by the global >>>> restoration could proceed (if we adopt the other idea), so >>>> maybe it still makes sense. >>>> 2. transition to REBALANCING: Users might get confused, >>>> since there is no actual rebalance. However, the current >>>> state for Kafka Streams during state restoration is actually >>>> REBALANCING, so it seems people already should understand >>>> that REBALANCING really means REBALANCING|RESTORING. This >>>> choice would preseve the existing state machine as well as >>>> the existing meaning of all states >>>> 3. add RESTORING: This could clarify the state machine, at >>>> the expense of breaking compatibility. We could implement a >>>> migration path by adding a new "state listener" interface >>>> for the new state machine. >>>> >>>> It seems like option 3 results in the most sensible system, >>>> but I'm not sure if it's worth the hassle. It certainly >>>> seems orthogonal to the goal of this KIP. Option 2 is >>>> probably the best practical choice. >>>> >>>> >>>> Regarding _how_ the global state restoration could set a >>>> flag preventing access to the store... This is indeed the >>>> central challenge to this new idea. Just throwing out one >>>> possibility: Once the global thread marks the store for >>>> restoration, it would throw an exception, such as >>>> "StoreIsRestoringException" on any access. The processor >>>> would _not_ catch this exception. Instead, the StreamThread >>>> would catch it, put this record/task on ice, and re-try it >>>> later. >>>> >>>> That last mechanism is actually pretty complicated. For >>>> example, what if the record is already partially processed >>>> in the topology? We'd have to remember which ProcessorNode >>>> to resume from when we re-try later. >>>> >>>> This is really where the spiritual overlap with KIP-572 >>>> comes in. Maybe Matthias can share some thoughts. >>>> >>>> Thanks, >>>> -John >>>> >>>> On Sun, 2020-09-13 at 07:50 +0000, Navinder Brar wrote: >>>>> >>>>> Hi John, >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> If I understand this correctly, you are proposing to use exponential >>> backoff >>>>> >>>>> in globalStore.get() to keep polling the global thread (whether it has >>> restored >>>>> >>>>> completely or not) while the processor pauses the processing of a >>> particular >>>>> >>>>> message which required querying on global store. That is stream >> threads >>>>> >>>>> are still in RUNNING state but kind of paused till global thread >>> restores and >>>>> >>>>> gives a go-ahead that complete state has been restored. I like the >> idea >>> for >>>>> the first two reasons that you have mentioned but thinking from >>> semanticspoint of view stream threads will be in RUNNING but still not >>> processing events, >>>>> will it be misleading for the users? Or you think we are doing it at >>> enough >>>>> >>>>> places already and an exception should suffice. As they will not >>> understand >>>>> >>>>> why the stream thread is not processing and how much more time it will >>> not >>>>> >>>>> process for. If the state explicitly stated RESTORING, >>>>> >>>>> users might have clearly understood that why it is happening. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Also, to achieve what we are discussing above, the store.get() on >> which >>> call is >>>>> >>>>> made has to understand whether it is a global store or not and if it >> is >>> a global store >>>>> >>>>> check whether it is restoring or not because both might be happening >>>>> >>>>> simultaneously with the above approach. With KIP-535 we have started >>> serving >>>>> >>>>> normal stores in restoring state but those are just interactive >> queries >>> but here >>>>> >>>>> globalStore.get() might be called while processing which we don’t >> want. >>> So, >>>>> >>>>> restore for global store and get() might have to be exclusive. Is >> there >>> a way for a >>>>> >>>>> store to know if it global store or not because now internally global >>> and normal >>>>> >>>>> stores will behave differently. Although if everyone is fine with the >>> above approach >>>>> >>>>> we can discuss this in PR as well. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Regards, >>>>> Navinder >>>>> >>>>> On Saturday, 5 September, 2020, 02:09:07 am IST, John Roesler < >>> vvcep...@apache.org> wrote: >>>>> >>>>> Hi all, >>>>> >>>>> This conversation sounds good to me so far. >>>>> >>>>> Sophie raised a concern before that changing the state >>>>> machine would break state restore listeners. This is true, >>>>> and we actually have not changed the main state machine in a >>>>> long time. The last change I remember was that we used to go >>>>> "CREATED -> RUNNING -> REBALANCING -> RUNNING", and now we >>>>> just go "CREATED -> REBALANCING -> RUNNING". This is >>>>> actually the reason why many state listeners check for >>>>> "REBALANCING -> RUNNING", to filter out the old "phantom >>>>> running" transition from "CREATED -> RUNNING". >>>>> >>>>> Anyway, the observation is that dropping the "phantom >>>>> running" state didn't break any real use case we were aware >>>>> of. But adding RESTORING between REBALACING and RUNNING >>>>> certainly would break the common pattern that we're aware >>>>> of. This would indeed be the first time we introduce a >>>>> practically breaking change to the state machine at least >>>>> since 2.0, and maybe since 1.0 too. We should probably >>>>> consider the impact. >>>>> >>>>> One alternative is to consider the state machine and the >>>>> state listener to be coupled APIs. We can deprecate and >>>>> replace the current state listener, and also introduce a new >>>>> state machine enum with our desired new state and >>>>> transitions, while leaving the existing one alone and >>>>> deprecating it. Then, no existing code would break, only get >>>>> deprecation warnings. >>>>> >>>>> >>>>> >>>>> Matthias gave me an idea a few messages back with his note >>>>> about setting/checking "flags". What if we flip it around, >>>>> and set the flags on the global stores themselves. Then, we >>>>> throw an exception when a processor queries the store while >>>>> it's restoring. When they get that exception, they just put >>>>> that task on the back burner for a while and try again later >>>>> (similar to Matthias's timeout handling KIP). The global >>>>> thread sets the flag on a particular store when it realizes >>>>> it needs to be re-created and unsets it when the restore >>>>> completes. >>>>> >>>>> Then: >>>>> 1. Only the global stores that actually need to be restored >>>>> block anything >>>>> 2. Only the tasks that access the stores get blocked >>>>> 3. No new states need to be introduced >>>>> >>>>> WDYT? >>>>> -John >>>>> >>>>> On Fri, 2020-09-04 at 13:18 +0000, Navinder Brar wrote: >>>>>> Hi Sophie, >>>>>> >>>>>> Thanks for the detailed explanation. I agree from a user standpoint, >> I >>> don't think there is any use-case to take any separate action in case of >>> GLOBAL_RESTORING and RESTORING phase. >>>>>> >>>>>> So, internally in the code we can handle the cases as >>> Matthiasexplained above and we can discuss those in the PR. I will update >>> the KIP based on what all we have converged towards including having an >>> uber RESTORING(rather than GLOBAL_RESTORING)state which takes stream and >>> global threads into consideration. >>>>>> >>>>>> I will update the KIP soon and share it again as a lot has changed >>> from where we started this KIP from. >>>>>> >>>>>> Regards,Navinder >>>>>> >>>>>> On Friday, 4 September, 2020, 04:19:20 am IST, Sophie >> Blee-Goldman >>> <sop...@confluent.io> wrote: >>>>>> >>>>>> Thanks Matthias, that sounds like what I was thinking. I think we >>> should >>>>>> always be >>>>>> able to figure out what to do in various scenarios as outlined in the >>>>>> previous email. >>>>>> >>>>>>> For the same reason, I wouldn't want to combine global restoring >> and >>>>>> normal restoring >>>>>>> because then it would make all the restorings independent but we >> don't >>>>>> want that. We >>>>>>> want global stores to be available before any processing starts on >> the >>>>>> active tasks. >>>>>> >>>>>> I'm not sure I follow this specific point, but I don't think I did a >>> good >>>>>> job of explaining my >>>>>> proposal so it's probably my own fault. When I say that we should >> merge >>>>>> RESTORING >>>>>> and GLOBAL_RESTORING, I just mean that we should provide a single >>>>>> user-facing >>>>>> state to encompass any ongoing restoration. The point of the >>> KafkaStreams >>>>>> RESTORING >>>>>> state is to alert users that their state may be unavailable for IQ, >> and >>>>>> active tasks may be >>>>>> idle. This is true for both global and non-global restoration. I >> think >>> the >>>>>> ultimate question >>>>>> is whether as a user, I would react any differently to a >>> GLOBAL_RESTORING >>>>>> state vs >>>>>> the regular RESTORING. My take is "no", in which case we should just >>>>>> provide a single >>>>>> unified state for the minimal public API. But if anyone can think of >> a >>>>>> reason for the user >>>>>> to need to distinguish between different types of restoration, that >>> would >>>>>> be a good >>>>>> argument to keep them separate. >>>>>> >>>>>> Internally, we do need to keep track of a "global restore" flag to >>>>>> determine the course >>>>>> of action -- for example if a StreamThread transitions to RUNNING but >>> sees >>>>>> that the >>>>>> KafkaStreams state is RESTORING, should it start processing or not? >> The >>>>>> answer >>>>>> depends on whether the state is RESTORING due to any global stores. >>> But the >>>>>> KafkaStreams state is a public interface, not an internal bookkeeper, >>> so we >>>>>> shouldn't >>>>>> try to push our internal logic into the user-facing API. >>>>>> >>>>>> >>>>>> On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax <mj...@apache.org> >>> wrote: >>>>>> >>>>>>> I think this issue can actually be resolved. >>>>>>> >>>>>>> - We need a flag on the stream-threads if global-restore is in >>>>>>> progress; for this case, the stream-thread may go into RUNNING >> state, >>>>>>> but it's not allowed to actually process data -- it will be allowed >> to >>>>>>> update standby-task though. >>>>>>> >>>>>>> - If a stream-thread restores, its own state is RESTORING and it >>> does >>>>>>> not need to care about the "global restore flag". >>>>>>> >>>>>>> - The global-thread just does was we discussed, including using >>> state >>>>>>> RESTORING. >>>>>>> >>>>>>> - The KafkaStreams client state is in RESTORING, if at least one >>> thread >>>>>>> (stream-thread or global-thread) is in state RESTORING. >>>>>>> >>>>>>> - On startup, if there is a global-thread, the just set the >>>>>>> global-restore flag upfront before we start the stream-threads (we >> can >>>>>>> actually still do the rebalance and potential restore in >> stream-thread >>>>>>> in parallel to global restore) and rely on the global-thread to >> unset >>>>>>> the flag. >>>>>>> >>>>>>> - The tricky thing is, to "stop" processing in stream-threads if >> we >>>>>>> need to wipe the global-store and rebuilt it. For this, we should >> set >>>>>>> the "global restore flag" on the stream-threads, but we also need to >>>>>>> "lock down" the global store in question and throw an exception if >> the >>>>>>> stream-thread tries to access it; if the stream-thread get this >>>>>>> exception, it need to cleanup itself, and wait until the "global >>> restore >>>>>>> flag" is unset before it can continue. >>>>>>> >>>>>>> >>>>>>> Do we think this would work? -- Of course, the devil is in the >> details >>>>>>> but it seems to become a PR discussion, and there is no reason to >> make >>>>>>> it part of the KIP. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 9/3/20 3:41 AM, Navinder Brar wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks, John, Matthias and Sophie for great feedback. >>>>>>>> >>>>>>>> On the point raised by Sophie that maybe we should allow normal >>>>>>> restoring during GLOBAL_RESTORING, I think it makes sense but the >>> challenge >>>>>>> would be what happens when normal restoring(on actives) has finished >>> but >>>>>>> GLOBAL_RESTORINGis still going on. Currently, all restorings are >>>>>>> independent of each other i.e. restoring happening on one >> task/thread >>>>>>> doesn't affect another. But if we do go ahead with allowing normal >>>>>>> restoring during GLOBAL_RESTORING then we willstill have to pause >> the >>>>>>> active tasks from going to RUNNING if GLOBAL_RESTORING has not >>> finished and >>>>>>> normal restorings have finished. For the same reason, I wouldn't >> want >>> to >>>>>>> combine global restoring and normal restoring because then it would >>> make >>>>>>> all the restorings independent but we don't want that. We want >> global >>>>>>> stores to be available before any processing starts on the active >>> tasks. >>>>>>>> Although I think restoring of replicas can still take place while >>> global >>>>>>> stores arerestoring because in replicas there is no danger of them >>> starting >>>>>>> processing. >>>>>>>> Also, one point to bring up is that currently during application >>> startup >>>>>>> global stores restore first and then normal stream threads start. >>>>>>>> Regards,Navinder >>>>>>>> >>>>>>>> On Thursday, 3 September, 2020, 06:58:40 am IST, Matthias J. >> Sax >>> < >>>>>>> mj...@apache.org> wrote: >>>>>>>> Thanks for the input Sophie. Those are all good points and I >> fully >>> agree >>>>>>>> with them. >>>>>>>> >>>>>>>> When saying "pausing the processing threads" I only considered them >>> in >>>>>>>> `RUNNING` and thought we figure out the detail on the PR... >> Excellent >>>>>>> catch! >>>>>>>> Changing state transitions is to some extend backward incompatible, >>> but >>>>>>>> I think (IIRC) we did it in the past and I personally tend to find >> it >>>>>>>> ok. That's why we cover those changes in a KIP. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote: >>>>>>>>> If we're going to add a new GLOBAL_RESTORING state to the >>> KafkaStreams >>>>>>> FSM, >>>>>>>>> maybe it would make sense to add a new plain RESTORING state that >> we >>>>>>>>> transition >>>>>>>>> to when restoring non-global state stores following a rebalance. >>> Right >>>>>>> now >>>>>>>>> all restoration >>>>>>>>> occurs within the REBALANCING state, which is pretty misleading. >>>>>>>>> Applications that >>>>>>>>> have large amounts of state to restore will appear to be stuck >>>>>>> rebalancing >>>>>>>>> according to >>>>>>>>> the state listener, when in fact the rebalance has completed long >>> ago. >>>>>>>>> Given that there >>>>>>>>> are very much real scenarios where you actually *are *stuck >>>>>>> rebalancing, it >>>>>>>>> seems useful to >>>>>>>>> distinguish plain restoration from more insidious cases that may >>> require >>>>>>>>> investigation and/or >>>>>>>>> intervention. >>>>>>>>> >>>>>>>>> I don't mean to hijack this KIP, I just think it would be odd to >>>>>>> introduce >>>>>>>>> GLOBAL_RESTORING >>>>>>>>> when there is no other kind of RESTORING state. One question this >>> brings >>>>>>>>> up, and I >>>>>>>>> apologize if this has already been addressed, is what to do when >> we >>> are >>>>>>>>> restoring >>>>>>>>> both normal and global state stores? It sounds like we plan to >>> pause the >>>>>>>>> StreamThreads >>>>>>>>> entirely, but there doesn't seem to be any reason not to allow >>> regular >>>>>>>>> state restoration -- or >>>>>>>>> even standby processing -- while the global state is >>> restoring.Given the >>>>>>>>> current effort to move >>>>>>>>> restoration & standbys to a separate thread, allowing them to >>> continue >>>>>>>>> while pausing >>>>>>>>> only the StreamThread seems quite natural. >>>>>>>>> >>>>>>>>> Assuming that we actually do allow both types of restoration to >>> occur at >>>>>>>>> the same time, >>>>>>>>> and if we did add a plain RESTORING state as well, which state >>> should we >>>>>>>>> end up in? >>>>>>>>> AFAICT the main reason for having a distinct {GLOBAL_}RESTORING >>> state >>>>>>> is to >>>>>>>>> alert >>>>>>>>> users of the non-progress of their active tasks. In both cases, >> the >>>>>>> active >>>>>>>>> task is unable >>>>>>>>> to continue until restoration has complete, so why distinguish >>> between >>>>>>> the >>>>>>>>> two at all? >>>>>>>>> Would it make sense to avoid a special GLOBAL_RESTORING state and >>> just >>>>>>>>> introduce >>>>>>>>> a single unified RESTORING state to cover both the regular and >>> global >>>>>>> case? >>>>>>>>> Just a thought >>>>>>>>> >>>>>>>>> My only concern is that this might be considered a breaking >> change: >>>>>>> users >>>>>>>>> might be >>>>>>>>> looking for the REBALANCING -> RUNNING transition specifically in >>> order >>>>>>> to >>>>>>>>> alert when >>>>>>>>> the application has started up, and we would no long go directly >>> from >>>>>>>>> REBALANCING to >>>>>>>>> RUNNING. I think we actually did/do this ourselves in a number >> of >>>>>>>>> integration tests and >>>>>>>>> possibly in some examples. That said, it seems more appropriate to >>> just >>>>>>>>> listen for >>>>>>>>> the RUNNING state rather than for a specific transition, and we >>> should >>>>>>>>> encourage users >>>>>>>>> to do so rather than go out of our way to support transition-type >>> state >>>>>>>>> listeners. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Sophie >>>>>>>>> >>>>>>>>> On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org> >>>>>>> wrote: >>>>>>>>>> I think this makes sense. >>>>>>>>>> >>>>>>>>>> When we introduce this new state, we might also tackle the jira a >>>>>>>>>> mentioned. If there is a global thread, on startup of a >>> `KafakStreams` >>>>>>>>>> client we should not transit to `REBALANCING` but to the new >>> state, and >>>>>>>>>> maybe also make the "bootstrapping" non-blocking. >>>>>>>>>> >>>>>>>>>> I guess it's worth to mention this in the KIP. >>>>>>>>>> >>>>>>>>>> Btw: The new state for KafkaStreams should also be part of the >> KIP >>> as >>>>>>> it >>>>>>>>>> is a public API change, too. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 8/29/20 9:37 AM, John Roesler wrote: >>>>>>>>>>> Hi Navinder, >>>>>>>>>>> >>>>>>>>>>> Thanks for the ping. Yes, that all sounds right to me. The name >>>>>>>>>> “RESTORING_GLOBAL” sounds fine, too. >>>>>>>>>>> I think as far as warnings go, we’d just propose to mention it >> in >>> the >>>>>>>>>> javadoc of the relevant methods that the given topics should be >>>>>>> compacted. >>>>>>>>>>> Thanks! >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote: >>>>>>>>>>>> Gentle ping. >>>>>>>>>>>> >>>>>>>>>>>> ~ Navinder >>>>>>>>>>>> On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder >> Brar >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks Matthias & John, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I am glad we are converging towards an understanding. So, to >>>>>>> summarize, >>>>>>>>>>>> we will still keep treating this change in KIP and instead of >>>>>>> providing >>>>>>>>>> a reset >>>>>>>>>>>> strategy, we will cleanup, and reset to earliest and build the >>> state. >>>>>>>>>>>> >>>>>>>>>>>> When we hit the exception and we are building the state, we >> will >>> stop >>>>>>>>>> all >>>>>>>>>>>> processing and change the state of KafkaStreams to something >> like >>>>>>>>>>>> >>>>>>>>>>>> “RESTORING_GLOBAL” or the like. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> How do we plan to educate users on the non desired effects of >>> using >>>>>>>>>>>> >>>>>>>>>>>> non-compacted global topics? (via the KIP itself?) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> +1 on changing the KTable behavior, reset policy for global, >>>>>>> connecting >>>>>>>>>>>> processors to global for a later stage when demanded. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Navinder >>>>>>>>>>>> On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. >>> Sax >>>>>>>>>>>> <mj...@apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Your observation is correct. Connecting (regular) stores to >>>>>>> processors >>>>>>>>>>>> is necessary to "merge" sub-topologies into single ones if a >>> store is >>>>>>>>>>>> shared. -- For global stores, the structure of the program does >>> not >>>>>>>>>>>> change and thus connecting srocessors to global stores is not >>>>>>> required. >>>>>>>>>>>> Also given our experience with restoring regular state stores >>> (ie, >>>>>>>>>>>> partial processing of task that don't need restore), it seems >>> better >>>>>>> to >>>>>>>>>>>> pause processing and move all CPU and network resources to the >>> global >>>>>>>>>>>> thread to rebuild the global store as soon as possible instead >> of >>>>>>>>>>>> potentially slowing down the restore in order to make progress >> on >>>>>>> some >>>>>>>>>>>> tasks. >>>>>>>>>>>> >>>>>>>>>>>> Of course, if we collect real world experience and it becomes >> an >>>>>>> issue, >>>>>>>>>>>> we could still try to change it? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 8/18/20 3:31 PM, John Roesler wrote: >>>>>>>>>>>>> Thanks Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> Sounds good. I'm on board with no public API change and just >>>>>>>>>>>>> recovering instead of crashing. >>>>>>>>>>>>> >>>>>>>>>>>>> Also, to be clear, I wouldn't drag KTables into it; I was >>>>>>>>>>>>> just trying to wrap my head around the congruity of our >>>>>>>>>>>>> choice for GlobalKTable with respect to KTable. >>>>>>>>>>>>> >>>>>>>>>>>>> I agree that whatever we decide to do would probably also >>>>>>>>>>>>> resolve KAFKA-7380. >>>>>>>>>>>>> >>>>>>>>>>>>> Moving on to discuss the behavior change, I'm wondering if >>>>>>>>>>>>> we really need to block all the StreamThreads. It seems like >>>>>>>>>>>>> we only need to prevent processing on any task that's >>>>>>>>>>>>> connected to the GlobalStore. >>>>>>>>>>>>> >>>>>>>>>>>>> I just took a look at the topology building code, and it >>>>>>>>>>>>> actually seems that connections to global stores don't need >>>>>>>>>>>>> to be declared. That's a bummer, since it means that we >>>>>>>>>>>>> really do have to stop all processing while the global >>>>>>>>>>>>> thread catches up. >>>>>>>>>>>>> >>>>>>>>>>>>> Changing this seems like it'd be out of scope right now, but >>>>>>>>>>>>> I bring it up in case I'm wrong and it actually is possible >>>>>>>>>>>>> to know which specific tasks need to be synchronized with >>>>>>>>>>>>> which global state stores. If we could know that, then we'd >>>>>>>>>>>>> only have to block some of the tasks, not all of the >>>>>>>>>>>>> threads. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote: >>>>>>>>>>>>>> Thanks for the discussion. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree that this KIP is justified in any case -- even if we >>> don't >>>>>>>>>>>>>> change public API, as the change in behavior is significant. >>>>>>>>>>>>>> >>>>>>>>>>>>>> A better documentation for cleanup policy is always good >> (even >>> if >>>>>>> I am >>>>>>>>>>>>>> not aware of any concrete complaints atm that users were not >>> aware >>>>>>> of >>>>>>>>>>>>>> the implications). Of course, for a regular KTable, one can >>>>>>>>>>>>>> enable/disable the source-topic-changelog optimization and >>> thus can >>>>>>>>>> use >>>>>>>>>>>>>> a non-compacted topic for this case, what is quite a >>> difference to >>>>>>>>>>>>>> global stores/tables; so maybe it's worth to point out this >>>>>>> difference >>>>>>>>>>>>>> explicitly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As mentioned before, the main purpose of the original Jira >> was >>> to >>>>>>>>>> avoid >>>>>>>>>>>>>> the crash situation but to allow for auto-recovering while it >>> was >>>>>>> an >>>>>>>>>>>>>> open question if it makes sense / would be useful to allow >>> users to >>>>>>>>>>>>>> specify a custom reset policy instead of using a hard-coded >>>>>>> "earliest" >>>>>>>>>>>>>> strategy. -- It seem it's still unclear if it would be useful >>> and >>>>>>> thus >>>>>>>>>>>>>> it might be best to not add it for now -- we can still add it >>>>>>> later if >>>>>>>>>>>>>> there are concrete use-cases that need this feature. >>>>>>>>>>>>>> >>>>>>>>>>>>>> @John: I actually agree that it's also questionable to allow >> a >>>>>>> custom >>>>>>>>>>>>>> reset policy for KTables... Not sure if we want to drag this >>>>>>> question >>>>>>>>>>>>>> into this KIP though? >>>>>>>>>>>>>> >>>>>>>>>>>>>> So it seem, we all agree that we actually don't need any >>> public API >>>>>>>>>>>>>> changes, but we only want to avoid crashing? >>>>>>>>>>>>>> >>>>>>>>>>>>>> For this case, to preserve the current behavior that >> guarantees >>>>>>> that >>>>>>>>>> the >>>>>>>>>>>>>> global store/table is always loaded first, it seems we need >> to >>>>>>> have a >>>>>>>>>>>>>> stop-the-world mechanism for the main `StreamThreads` for >> this >>>>>>> case -- >>>>>>>>>>>>>> do we need to add a new state to KafkaStreams client for this >>> case? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Having a new state might also be helpful for >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 8/17/20 7:34 AM, John Roesler wrote: >>>>>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I see what you mean about the global consumer being similar >>>>>>>>>>>>>>> to the restore consumer. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also agree that automatically performing the recovery >>>>>>>>>>>>>>> steps should be strictly an improvement over the current >>>>>>>>>>>>>>> situation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also, yes, it would be a good idea to make it clear that the >>>>>>>>>>>>>>> global topic should be compacted in order to ensure correct >>>>>>>>>>>>>>> semantics. It's the same way with input topics for KTables; >>>>>>>>>>>>>>> we rely on users to ensure the topics are compacted, and if >>>>>>>>>>>>>>> they aren't, then the execution semantics will be broken. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote: >>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your inputs. Since, global topics are in a way >>> their >>>>>>> own >>>>>>>>>> changelog, wouldn’t the global consumers be more akin to restore >>>>>>> consumers >>>>>>>>>> than the main consumer? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am also +1 on catching the exception and setting it to >> the >>>>>>>>>> earliest for now. Whenever an instance starts, currently global >>> stream >>>>>>>>>> thread(if available) goes to RUNNING before stream threads are >>> started >>>>>>> so >>>>>>>>>> that means the global state is available when the processing by >>> stream >>>>>>>>>> threads start. So, with the new change of catching the exception, >>>>>>> cleaning >>>>>>>>>> store and resetting to earlier would probably be “stop the world” >>> as >>>>>>> you >>>>>>>>>> said John, as I think we will have to pause the stream threads >>> till the >>>>>>>>>> whole global state is recovered. I assume it is "stop the world" >>> right >>>>>>> now >>>>>>>>>> as well, since now also if an InvalidOffsetException comes, we >>> throw >>>>>>>>>> streams exception and the user has to clean up and handle all >> this >>>>>>> manually >>>>>>>>>> and when that instance will start, it will restore global state >>> first. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I had an additional thought to this whole problem, would it >>> be >>>>>>>>>> helpful to educate the users that global topics should have >> cleanup >>>>>>> policy >>>>>>>>>> as compact, so that this invalid offset exception never arises >> for >>>>>>> them. >>>>>>>>>> Assume for example, that the cleanup policy in global topic is >>>>>>> "delete" and >>>>>>>>>> it has deleted k1, k2 keys(via retention.ms) although all the >>>>>>> instances >>>>>>>>>> had already consumed them so they are in all global stores and >> all >>>>>>> other >>>>>>>>>> instances are up to date on the global data(so no >>>>>>> InvalidOffsetException). >>>>>>>>>> Now, a new instance is added to the cluster, and we have already >>> lost >>>>>>> k1, >>>>>>>>>> k2 from the global topic so it will start consuming from the >>> earliest >>>>>>> point >>>>>>>>>> in the global topic. So, wouldn’t this global store on the new >>>>>>> instance has >>>>>>>>>> 2 keys less than all the other global stores already available in >>> the >>>>>>>>>> cluster? Please let me know if I am missing something. Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Navinder >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Friday, 14 August, 2020, 10:03:42 am IST, John >>> Roesler < >>>>>>>>>> vvcep...@apache.org> wrote: >>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It seems like the main motivation for this proposal is >>> satisfied >>>>>>> if >>>>>>>>>> we just implement some recovery mechanism instead of crashing. If >>> the >>>>>>>>>> mechanism is going to be pausing all the threads until the state >> is >>>>>>>>>> recovered, then it still seems like a big enough behavior change >> to >>>>>>> warrant >>>>>>>>>> a KIP still. >>>>>>>>>>>>>>>> I have to confess I’m a little unclear on why a custom >> reset >>>>>>> policy >>>>>>>>>> for a global store, table, or even consumer might be considered >>> wrong. >>>>>>> It’s >>>>>>>>>> clearly wrong for the restore consumer, but the global consumer >>> seems >>>>>>> more >>>>>>>>>> semantically akin to the main consumer than the restore consumer. >>>>>>>>>>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from >>>>>>> latest, >>>>>>>>>> shouldn’t it also be wrong for a KTable, for exactly the same >>> reason? >>>>>>> It >>>>>>>>>> certainly seems like it would be an odd choice, but I’ve seen >> many >>>>>>> choices >>>>>>>>>> I thought were odd turn out to have perfectly reasonable use >> cases. >>>>>>>>>>>>>>>> As far as the PAPI global store goes, I could see adding >> the >>>>>>> option >>>>>>>>>> to configure it, since as Matthias pointed out, there’s really no >>>>>>> specific >>>>>>>>>> semantics for the PAPI. But if automatic recovery is really all >>>>>>> Navinder >>>>>>>>>> wanted, the I could also see deferring this until someone >>> specifically >>>>>>>>>> wants it. >>>>>>>>>>>>>>>> So the tl;dr is, if we just want to catch the exception and >>>>>>> rebuild >>>>>>>>>> the store by seeking to earliest with no config or API changes, >>> then >>>>>>> I’m +1. >>>>>>>>>>>>>>>> I’m wondering if we can improve on the “stop the world” >>> effect of >>>>>>>>>> rebuilding the global store, though. It seems like we could put >> our >>>>>>> heads >>>>>>>>>> together and come up with a more fine-grained approach to >>> maintaining >>>>>>> the >>>>>>>>>> right semantics during recovery while still making some progress. >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> John >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote: >>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> IMHO, now as you explained using >>>>>>>>>> ‘global.consumer.auto.offset.reset’ is >>>>>>>>>>>>>>>>> not as straightforward >>>>>>>>>>>>>>>>> as it seems and it might change the existing behavior for >>> users >>>>>>>>>> without >>>>>>>>>>>>>>>>> they releasing it, I also >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> think that we should change the behavior inside global >>> stream >>>>>>>>>> thread to >>>>>>>>>>>>>>>>> not die on >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> InvalidOffsetException and instead clean and rebuild the >>> state >>>>>>>>>> from the >>>>>>>>>>>>>>>>> earliest. On this, as you >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> mentioned that we would need to pause the stream threads >>> till >>>>>>> the >>>>>>>>>>>>>>>>> global store is completely restored. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Without it, there will be incorrect processing results if >>> they >>>>>>> are >>>>>>>>>>>>>>>>> utilizing a global store during processing. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So, basically we can divide the use-cases into 4 parts. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - PAPI based global stores (will have the earliest >>>>>>> hardcoded) >>>>>>>>>>>>>>>>> - PAPI based state stores (already has >>> auto.reset.config) >>>>>>>>>>>>>>>>> - DSL based GlobalKTables (will have earliest >> hardcoded) >>>>>>>>>>>>>>>>> - DSL based KTables (will continue with >>> auto.reset.config) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So, this would mean that we are not changing any existing >>>>>>>>>> behaviors >>>>>>>>>>>>>>>>> with this if I am right. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I guess we could improve the code to actually log a >> warning >>> for >>>>>>>>>> this >>>>>>>>>>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I like this idea. In case we go ahead with the above >>> approach >>>>>>>>>> and if we can’t >>>>>>>>>>>>>>>>> deprecate it, we should educate users that this config >>> doesn’t >>>>>>>>>> work. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Looking forward to hearing thoughts from others as well. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - Navinder On Tuesday, 4 August, 2020, 05:07:59 am IST, >>>>>>>>>> Matthias J. >>>>>>>>>>>>>>>>> Sax <mj...@apache.org> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Navinder, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> thanks for updating the KIP. I think the motivation >> section >>> is >>>>>>> not >>>>>>>>>>>>>>>>> totally accurate (what is not your fault though, as the >>> history >>>>>>> of >>>>>>>>>> how >>>>>>>>>>>>>>>>> we handle this case is intertwined...) For example, >>>>>>>>>> "auto.offset.reset" >>>>>>>>>>>>>>>>> is hard-coded for the global consumer to "none" and using >>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf >>>>>>>>>>>>>>>>> >>>>>>> >>> >> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values >>>>>>>>>> ) >>>>>>>>>>>>>>>>> Also, we could not even really deprecate the config as >>>>>>> mentioned in >>>>>>>>>>>>>>>>> rejected alternatives sections, because we need >>>>>>>>>> `auto.offset.reset` for >>>>>>>>>>>>>>>>> the main consumer -- and adding a prefix is independent of >>> it. >>>>>>>>>> Also, >>>>>>>>>>>>>>>>> because we ignore the config, it's is also >>> deprecated/removed if >>>>>>>>>> you wish. >>>>>>>>>>>>>>>>> I guess we could improve the code to actually log a >> warning >>> for >>>>>>>>>> this >>>>>>>>>>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>>>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The other question is about compatibility with regard to >>> default >>>>>>>>>>>>>>>>> behavior: if we want to reintroduce >>>>>>>>>> `global.consumer.auto.offset.reset` >>>>>>>>>>>>>>>>> this basically implies that we need to respect >>>>>>>>>> `auto.offset.reset`, too. >>>>>>>>>>>>>>>>> Remember, that any config without prefix is applied to all >>>>>>> clients >>>>>>>>>> that >>>>>>>>>>>>>>>>> support this config. Thus, if a user does not limit the >>> scope of >>>>>>>>>> the >>>>>>>>>>>>>>>>> config to the main consumer (via >>>>>>>>>> `main.consumer.auto.offset.reset`) but >>>>>>>>>>>>>>>>> uses the non-prefix versions and sets it to "latest" (and >>> relies >>>>>>>>>> on the >>>>>>>>>>>>>>>>> current behavior that `auto.offset.reset` is "none", and >>>>>>>>>> effectively >>>>>>>>>>>>>>>>> "earliest" on the global consumer), the user might end up >>> with a >>>>>>>>>>>>>>>>> surprise as the global consumer behavior would switch from >>>>>>>>>> "earliest" to >>>>>>>>>>>>>>>>> "latest" (most likely unintentionally). Bottom line is, >> that >>>>>>> users >>>>>>>>>> might >>>>>>>>>>>>>>>>> need to change configs to preserve the old behavior... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> However, before we discuss those details, I think we >> should >>>>>>>>>> discuss the >>>>>>>>>>>>>>>>> topic in a broader context first: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - for a GlobalKTable, does it even make sense from a >>>>>>> correctness >>>>>>>>>> point >>>>>>>>>>>>>>>>> of view, to allow users to set a custom reset policy? It >>> seems >>>>>>> you >>>>>>>>>>>>>>>>> currently don't propose this in the KIP, but as you don't >>>>>>> mention >>>>>>>>>> it >>>>>>>>>>>>>>>>> explicitly it's unclear if that on purpose of an >> oversight? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - Should we treat global stores differently to >>> GlobalKTables >>>>>>> and >>>>>>>>>> allow >>>>>>>>>>>>>>>>> for more flexibility (as the PAPI does not really provide >>> any >>>>>>>>>> semantic >>>>>>>>>>>>>>>>> contract). It seems that is what you propose in the KIP. >> We >>>>>>> should >>>>>>>>>>>>>>>>> discuss if this flexibility does make sense or not for the >>> PAPI, >>>>>>>>>> or if >>>>>>>>>>>>>>>>> we should apply the same reasoning about correctness we >> use >>> for >>>>>>>>>> KTables >>>>>>>>>>>>>>>>> to global stores? To what extend are/should they be >>> different? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - If we support auto.offset.reset for global store, how >>>>>>> should we >>>>>>>>>>>>>>>>> handle the initial bootstrapping of the store/table (that >> is >>>>>>>>>> hard-coded >>>>>>>>>>>>>>>>> atm)? Should we skip it if the policy is "latest" and >> start >>>>>>> with an >>>>>>>>>>>>>>>>> empty state? Note that we did consider this behavior >>> incorrect >>>>>>> via >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus >>> I am >>>>>>>>>> wondering >>>>>>>>>>>>>>>>> why should we change it back again? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Finally, the main motivation for the Jira ticket was to >> let >>> the >>>>>>>>>> runtime >>>>>>>>>>>>>>>>> auto-recover instead of dying as it does currently. If we >>> decide >>>>>>>>>> that a >>>>>>>>>>>>>>>>> custom reset policy does actually not make sense, we can >>> just >>>>>>>>>> change the >>>>>>>>>>>>>>>>> global-thread to not die any longer on an >>>>>>> `InvalidOffsetException` >>>>>>>>>> but >>>>>>>>>>>>>>>>> rebuild the state automatically. This would be "only" a >>> behavior >>>>>>>>>> change >>>>>>>>>>>>>>>>> but does not require any public API changes. -- For this >>> case, >>>>>>> we >>>>>>>>>> should >>>>>>>>>>>>>>>>> also think about the synchronization with the main >>> processing >>>>>>>>>> threads? >>>>>>>>>>>>>>>>> On startup we bootstrap the global stores before >> processing >>>>>>>>>> happens. >>>>>>>>>>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global >>>>>>> thread >>>>>>>>>> dies, >>>>>>>>>>>>>>>>> the main threads cannot access the global stores any >> longer >>> an >>>>>>>>>> also die. >>>>>>>>>>>>>>>>> If we re-build the state though, do we need to pause the >>> main >>>>>>>>>> thread >>>>>>>>>>>>>>>>> during this phase? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote: >>>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have updated the KIP to make the motivation more clear. >>> In a >>>>>>>>>> nutshell, we will use the already existing config >>>>>>>>>> "global.consumer.auto.offset.reset" for users to set a blanket >>> reset >>>>>>> policy >>>>>>>>>> for all global topics and add a new interface to set per-topic >>> reset >>>>>>> policy >>>>>>>>>> for each global topic(for which we specifically need this KIP). >>> There >>>>>>> was a >>>>>>>>>> point raised from Matthias above to always reset to earliest by >>>>>>> cleaning >>>>>>>>>> the stores and seekToBeginning in case of InvalidOffsetException. >>> We >>>>>>> can go >>>>>>>>>> with that route as well and I don't think it would need a KIP as >>> if we >>>>>>> are >>>>>>>>>> not providing users an option to have blanket reset policy on >>> global >>>>>>>>>> topics, then a per-topic override would also not be required(the >>> KIP is >>>>>>>>>> required basically for that). Although, I think if users have an >>>>>>> option to >>>>>>>>>> choose reset policy for StreamThread then the option should be >>>>>>> provided for >>>>>>>>>> GlobalStreamThread as well and if we don't want to use the >>>>>>>>>> "global.consumer.auto.offset.reset" then we would need to >>> deprecate it >>>>>>>>>> because currently it's not serving any purpose. For now, I have >>> added >>>>>>> it in >>>>>>>>>> rejected alternatives but we can discuss this. >>>>>>>>>>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias >> we >>>>>>> have >>>>>>>>>> fixed it last week as part of KAFKA-10306. >>>>>>>>>>>>>>>>>> ~Navinder >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder >>> Brar < >>>>>>>>>> navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Sorry, it took some time to respond back. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> “but I thought we would pass the config through to the >>> client.” >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @John, sure we can use the config in >> GloablStreamThread, >>> that >>>>>>>>>> could be one of the way to solve it. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Matthias, sure cleaning the store and recreating is one >>> way >>>>>>> but >>>>>>>>>> since we are giving an option to reset in StreamThread why the >>>>>>>>>> implementation should be different in GlobalStreamThread. I think >>> we >>>>>>> should >>>>>>>>>> use the global.consumer.auto.offset.reset config to accept the >>> reset >>>>>>>>>> strategy opted by the user although I would be ok with just >>> cleaning >>>>>>> and >>>>>>>>>> resetting to the latest as well for now. Currently, we throw a >>>>>>>>>> StreamsException in case of InvalidOffsetException in >>>>>>> GlobalStreamThread so >>>>>>>>>> just resetting would still be better than what happens currently. >>>>>>>>>>>>>>>>>> Matthias, I found this comment in StreamBuilder for >>>>>>> GlobalKTable >>>>>>>>>> ‘* Note that {@link GlobalKTable} always applies {@code >>>>>>>>>> "auto.offset.reset"} strategy {@code "earliest"} regardless of >> the >>>>>>>>>> specified value in {@link StreamsConfig} or {@link Consumed}.’ >>>>>>>>>>>>>>>>>> So, I guess we are already cleaning up and recreating for >>>>>>>>>> GlobalKTable from earliest offset. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Guozhan while looking at the code, I also noticed a >> TODO: >>>>>>>>>> pending in GlobalStateManagerImpl, when InvalidOffsetException is >>>>>>> thrown. >>>>>>>>>> Earlier, we were directly clearing the store here and recreating >>> from >>>>>>>>>> scratch but that code piece is removed now. Are you working on a >>>>>>> follow-up >>>>>>>>>> PR for this or just handling the reset in GlobalStreamThread >>> should be >>>>>>>>>> sufficient? >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Navinder >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias >> J. >>> Sax >>>>>>> < >>>>>>>>>> mj...@apache.org> wrote: >>>>>>>>>>>>>>>>>> Atm, the config should be ignored and the >> global-consumer >>>>>>>>>> should use >>>>>>>>>>>>>>>>>> "none" in a hard-coded way. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> However, if am still wondering if we actually want/need >> to >>>>>>> allow >>>>>>>>>> users >>>>>>>>>>>>>>>>>> to specify the reset policy? It might be worth to >>> consider, to >>>>>>>>>> just >>>>>>>>>>>>>>>>>> change the behavior: catch the exception, log an ERROR >> (for >>>>>>>>>> information >>>>>>>>>>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate >>> the >>>>>>>>>> store? >>>>>>>>>>>>>>>>>> Btw: if we want to allow users to set the reset policy, >>> this >>>>>>>>>> should be >>>>>>>>>>>>>>>>>> possible via the config, or via overwriting the config in >>> the >>>>>>>>>> method >>>>>>>>>>>>>>>>>> itself. Thus, we would need to add the new overloaded >>> method to >>>>>>>>>>>>>>>>>> `Topology` and `StreamsBuilder`. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Another question to ask: what about GlobalKTables? Should >>> they >>>>>>>>>> behave >>>>>>>>>>>>>>>>>> the same? An alternative design could be, to allow users >> to >>>>>>>>>> specify a >>>>>>>>>>>>>>>>>> flexible reset policy for global-stores, but not for >>>>>>>>>> GlobalKTables and >>>>>>>>>>>>>>>>>> use the strategy suggested above for this case. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote: >>>>>>>>>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... >>> You >>>>>>>>>> said we are not currently using the config, but I thought we >> would >>>>>>> pass the >>>>>>>>>> config through to the client. Can you confirm whether or not the >>>>>>> existing >>>>>>>>>> config works for your use case? >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> John >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote: >>>>>>>>>>>>>>>>>>>> Sorry my bad. Found it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer} >>>>>>> configs >>>>>>>>>> for the >>>>>>>>>>>>>>>>>>>> global consumer client from >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> * the general consumer client configs. The override >>>>>>> precedence >>>>>>>>>> is the >>>>>>>>>>>>>>>>>>>> following (from highest to lowest precedence): >>>>>>>>>>>>>>>>>>>> * 1. global.consumer.[config-name].. >>>>>>>>>>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = >>>>>>>>>> "global.consumer."; >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So, that's great. We already have a config exposed to >>> reset >>>>>>>>>> offsets for >>>>>>>>>>>>>>>>>>>> global topics via global.consumer.auto.offset.reset >> just >>> that >>>>>>>>>> we are >>>>>>>>>>>>>>>>>>>> not actually using it inside GlobalStreamThread to >> reset. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Navinder >>>>>>>>>>>>>>>>>>>> On Monday, 29 June, 2020, 12:24:21 am IST, Navinder >>> Brar >>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for your feedback. >>>>>>>>>>>>>>>>>>>> 1. I think there is some confusion on my first point, >> the >>>>>>> enum >>>>>>>>>> I am >>>>>>>>>>>>>>>>>>>> sure we can use the same one but the external config >>> which >>>>>>>>>> controls the >>>>>>>>>>>>>>>>>>>> resetting in global stream thread either we can the >> same >>> one >>>>>>>>>> which >>>>>>>>>>>>>>>>>>>> users use for source topics(StreamThread) or we can >>> provide a >>>>>>>>>> new one >>>>>>>>>>>>>>>>>>>> which specifically controls global topics. For e.g. >>> currently >>>>>>>>>> if I get >>>>>>>>>>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I >>> can >>>>>>>>>> choose >>>>>>>>>>>>>>>>>>>> whether to reset from Earliest or Latest(with >>>>>>>>>> auto.offset.reset). Now >>>>>>>>>>>>>>>>>>>> either we can use the same option and say if I get the >>> same >>>>>>>>>> exception >>>>>>>>>>>>>>>>>>>> for global topics I will follow same resetting. Or some >>> users >>>>>>>>>> might >>>>>>>>>>>>>>>>>>>> want to have totally different setting for both source >>> and >>>>>>>>>> global >>>>>>>>>>>>>>>>>>>> topics, like for source topic I want resetting from >>> Latest >>>>>>> but >>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> global topics I want resetting from Earliest so in that >>> case >>>>>>>>>> adding a >>>>>>>>>>>>>>>>>>>> new config might be better. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2. I couldn't find this config currently >>>>>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in >>>>>>>>>> GlobalStreamThread.java >>>>>>>>>>>>>>>>>>>> we are throwing a StreamsException for >>> InvalidOffsetException >>>>>>>>>> and there >>>>>>>>>>>>>>>>>>>> is a test as >>>>>>>>>>>>>>>>>>>> well >>>>>>>>>> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I >>>>>>>>>>>>>>>>>>>> think this is the config we are trying to introduce >> with >>> this >>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>> -Navinder On Saturday, 27 June, 2020, 07:03:04 pm IST, >>> John >>>>>>>>>> Roesler >>>>>>>>>>>>>>>>>>>> <j...@vvcephei.org> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for this proposal! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding your question about whether to use the same >>> policy >>>>>>>>>>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I >>> think >>>>>>>>>>>>>>>>>>>> we can just use the same AutoOffsetReset enum. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Can you confirm whether setting the reset policy config >>> on >>>>>>> the >>>>>>>>>>>>>>>>>>>> global consumer currently works or not? Based on my >>> reading >>>>>>>>>>>>>>>>>>>> of StreamsConfig, it looks like it would be: >>>>>>>>>>>>>>>>>>>> "global.consumer.auto.offset.reset". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If that does work, would you still propose to augment >> the >>>>>>>>>>>>>>>>>>>> Java API? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote: >>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> KIP: >>>>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy >>>>>>>>>>>>>>>>>>>>> I have taken over this KIP since it has been dormant >>> for a >>>>>>>>>> long time >>>>>>>>>>>>>>>>>>>>> and this looks important for use-cases that have large >>>>>>> global >>>>>>>>>> data, so >>>>>>>>>>>>>>>>>>>>> rebuilding global stores from scratch might seem >>> overkill in >>>>>>>>>> case of >>>>>>>>>>>>>>>>>>>>> InvalidOffsetExecption. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We want to give users the control to use reset >>> policy(as we >>>>>>> do >>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have >>> still >>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>> decided whether to restrict this option to the same >>> reset >>>>>>>>>> policy being >>>>>>>>>>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config) >> or >>> add >>>>>>>>>> another >>>>>>>>>>>>>>>>>>>>> reset config specifically for global stores >>>>>>>>>>>>>>>>>>>>> "global.auto.offset.reset" which gives users more >>> control to >>>>>>>>>> choose >>>>>>>>>>>>>>>>>>>>> separate policies for global and stream threads. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I would like to hear your opinions on the KIP. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Navinder >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >> > >