Updated the KIP to be more aligned with global state store function names. If I remember correctly during restore the processor will not be used right? I think this might provide issues if your processor is doing a projection of the data. Either way, I would not add that into this KIP since it is a specific use-case pattern.
Unless there is anything more to add or change, I would propose moving to a vote? Cheers! D. From: Matthias J. Sax <mj...@apache.org> Date: Friday, 18 February 2022 at 03:29 To: dev@kafka.apache.org <dev@kafka.apache.org> Subject: Re: [DISCUSS] KIP-813 Shared State Stores Thanks for updating the KIP! I am wondering if we would need two overloads of `addReadOnlyStateStore` one w/ and one w/o `TimestampExtractor` argument to effectively make it an "optional" parameter? Also wondering if we need to pass in a `String sourceName` and `String processorName` parameters (similar to `addGlobalStore()`?) instead if re-using the store name as currently proposed? -- In general I don't have a strong opinion either way, but it seems to introduce some API inconsistency if we don't follow the `addGlobalStore()` pattern? > Another thing we were confronted with was the restoring of state when the > actual local storage is gone. For example, we host on K8s with ephemeral > pods, so there is no persisted storage between pod restarts. However, the > consumer group will be already been at the latest offset, preventing from > previous data to be restored within the new pod’s statestore. We have already code in-place in the runtime to do the right thing for this case (ie, via DSL source-table changelog optimization). We can re-use this part. It's nothing we need to discuss on the KIP, but we can discuss on the PR later. -Matthias On 2/17/22 10:09 AM, Guozhang Wang wrote: > Hi Daan, > > I think for the read-only state stores you'd need ot slightly augment the > checkpointing logic so that it would still write the checkpointed offsets > while restoring from the changelogs. > > > Guozhang > > On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dger...@korfinancial.com> > wrote: > >>> Could you add more details about the signature of >>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any >>> overloads taking different parameters? The KIP only contains some verbal >>> description on the "Implementation Plan" section, that is hard to find >>> and hard to read. >>> >>> The KIP mentions a `ProcessorProvider` -- do you mean >> `ProcessorSupplier`? >>> >>> About timestamp synchronization: why do you propose to disable timestamp >>> synchronization (similar to global state stores)? It seems to be an >>> unnecessary limitation? -- Given that we could re-use the new method for >>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having >>> timestamp synchronization enabled seems to be important? >> >> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to >> have allow for timestamp synchronization. >> >> Another thing we were confronted with was the restoring of state when the >> actual local storage is gone. For example, we host on K8s with ephemeral >> pods, so there is no persisted storage between pod restarts. However, the >> consumer group will be already been at the latest offset, preventing from >> previous data to be restored within the new pod’s statestore. >> >> If I remember correctly, there was some checkpoint logic available when >> restoring, but we are bypassing that since logging is disabled on the >> statestore, no? >> >> As always, thanks for your insights. >> >> Cheers, >> D. >> >> >> From: Matthias J. Sax <mj...@apache.org> >> Date: Wednesday, 16 February 2022 at 02:09 >> To: dev@kafka.apache.org <dev@kafka.apache.org> >> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >> Thanks for updating the KIP. >> >> Could you add more details about the signature of >> `addReadOnlyStateStore()` -- What parameters does it take? Are there any >> overloads taking different parameters? The KIP only contains some verbal >> description on the "Implementation Plan" section, that is hard to find >> and hard to read. >> >> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`? >> >> About timestamp synchronization: why do you propose to disable timestamp >> synchronization (similar to global state stores)? It seems to be an >> unnecessary limitation? -- Given that we could re-use the new method for >> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having >> timestamp synchronization enabled seems to be important? >> >> >> -Matthias >> >> >> On 2/8/22 11:01 PM, Guozhang Wang wrote: >>> Daan, >>> >>> Thanks for the replies, those make sense to me. >>> >>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com> >> wrote: >>> >>>> I just updated the KIP to reflect the things discussed in this thread. >>>> >>>> As for your questions Guozhang: >>>> >>>>> 1) How do we handle if the num.partitions of app A's store changelog is >>>>> different from the num.tasks of app B's sub-topology with that >> read-only >>>>> store? Or are we going to let each task of B keep a whole copy of the >>>> store >>>>> of A by reading all of its changelog partitions, like global stores? >>>> >>>> Good question. Both need to be co-partitioned to have the data >> available. >>>> Another option would be to use IQ to make the request, but that seems >> far >>>> from ideal. >>>> >>>>> 2) Are we trying to synchronize the store updates from the changelog to >>>> app >>>>> B's processing timelines, or just like what we do for global stores >> that >>>> we >>>>> just update the read-only stores async? >>>> >>>> Pretty much the same as we do for global stores. >>>> >>>>> 3) If the answer to both of the above questions are the latter, then >>>> what's >>>>> the main difference of adding a read-only store v.s. adding a global >>>> store? >>>> >>>> I think because of the first answer the behavior differs from global >>>> stores. >>>> >>>> Makes sense? >>>> >>>> Cheers, >>>> >>>> D. >>>> >>>> From: Matthias J. Sax <mj...@apache.org> >>>> Date: Thursday, 20 January 2022 at 21:12 >>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>>>> Any processor that would use that materialized, read-only statestore >>>> would need to wait for the store to be restored. I can't find a way to >> make >>>> that possible since processors can't wait for the statestore to be >> restored. >>>> >>>> This is built into the runtime already. Nothing to worry about. It's >>>> part of the regular restore logic -- as long as any store is restoring, >>>> all processing is blocked. >>>> >>>>> Also, since the statestore would have logging disabled, it means there >>>> is no initial restoration going on. >>>> >>>> No. When we hookup the input topic as changelog (as the DSL does) we >>>> restore from the input topic during regular restore phase. The restore >>>> logic does not even know it's reading from the input topic, but not from >>>> a "*-changelog" topic). >>>> >>>> Disabling changelogging does only affect the write path (ie, >>>> `store.put()`) but not the restore path due to the internal "hookup" of >>>> the input topic inside the restore logic. >>>> >>>> It's not easy to find/understand by reverse engineering I guess, but >>>> it's there. >>>> >>>> One pointer where the actual hookup happens (might help to dig into it >>>> more if you want): >>>> >>>> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356 >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 1/20/22 10:04 AM, Guozhang Wang wrote: >>>>> Hello Daan, >>>>> >>>>> Thanks for writing the KIP. I just read through it and just my 2c here: >>>> to >>>>> me it seems that one of the goal would be to "externalize" the internal >>>>> changelog topic of an application (say A) so that other consumers can >>>>> directly read them --- though technically without any auth, anyone >>>> knowing >>>>> the topic name would be able to write to it too, conceptually we would >>>> just >>>>> assume that app A is the only writer of that topic --- The question I >> had >>>>> is how much we want to externalize the topic. For example we can, >>>>> orthogonally to this KIP, just allow users to pass in a customized >> topic >>>>> name when constructing a state store, indicating the application A to >> use >>>>> that as the changelog, and since that topic is created outside of A and >>>> is >>>>> publicly visible to anyone else on that cluster, anyone --- including >> any >>>>> consumers, or streams apps. This is probably most flexible as for >>>> sharing, >>>>> but we are even less assured that if application A is the only writer >> to >>>>> that external topic unless we have explicit auth for A on that topic. >>>>> >>>>> Aside of that, here are a few more detailed comments about the >>>>> implementation design itself following your current proposal: >>>>> >>>>> 1) How do we handle if the num.partitions of app A's store changelog is >>>>> different from the num.tasks of app B's sub-topology with that >> read-only >>>>> store? Or are we going to let each task of B keep a whole copy of the >>>> store >>>>> of A by reading all of its changelog partitions, like global stores? >>>>> 2) Are we trying to synchronize the store updates from the changelog to >>>> app >>>>> B's processing timelines, or just like what we do for global stores >> that >>>> we >>>>> just update the read-only stores async? >>>>> 3) If the answer to both of the above questions are the latter, then >>>> what's >>>>> the main difference of adding a read-only store v.s. adding a global >>>> store? >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> >>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com> >>>>> wrote: >>>>> >>>>>> Hey Matthias, >>>>>> >>>>>> Thank you for that feedback, certainly some things to think about. Let >>>> me >>>>>> add my thoughts: >>>>>> >>>>>> +1 on simplifying the motivation. Was aiming to add more context but I >>>>>> think you're right, bringing it back to the essence makes more sense. >>>>>> >>>>>> I also follow the reasoning of not having leader and follower. Makes >>>> sense >>>>>> to view it from a single app point of view. >>>>>> >>>>>> As for the API method and its parameters, I wanted to stay close to >> the >>>>>> API for adding a regular statestore, but I can perfectly find myself >> in >>>>>> defining an addReadOnlyStateStore() method instead. >>>>>> >>>>>> I agree the processor approach would be the most flexible one, and >>>> surely >>>>>> it allows you to use a processor to base the statestore off an >> existing >>>>>> topic. From what I understood from the codebase, there might be a >>>> problem >>>>>> when using that statestore. Any processor that would use that >>>> materialized, >>>>>> read-only statestore would need to wait for the store to be restored. >> I >>>>>> can't find a way to make that possible since processors can't wait for >>>> the >>>>>> statestore to be restored. Also, since the statestore would have >> logging >>>>>> disabled, it means there is no initial restoration going on. As you >>>> wrote, >>>>>> the DSL is already doing this, so I'm pretty sure I'm missing >> something, >>>>>> just unable to find what exactly. >>>>>> >>>>>> I will rewrite the parts in the KIP to make processor-based the >>>> preferred >>>>>> choice, along with the changes to the motivation etc. Only thing to >>>> figure >>>>>> out is that restoring behavior to be sure processors of the readonly >>>>>> statestore aren't working with stale data. >>>>>> >>>>>> D. >>>>>> >>>>>> -----Original Message----- >>>>>> From: Matthias J. Sax <mj...@apache.org> >>>>>> Sent: 19 January 2022 21:31 >>>>>> To: dev@kafka.apache.org >>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores >>>>>> >>>>>> Daan, >>>>>> >>>>>> thanks for the KIP. I personally find the motivation section a little >>>> bit >>>>>> confusing. If I understand the KIP correctly, you want to read a topic >>>> into >>>>>> a state store (ie, materialize it). This is already possible today. >>>>>> >>>>>> Of course, today a "second" changelog topic would be created. It seems >>>> the >>>>>> KIP aims to avoid the additional changelog topic, and to allow to >> re-use >>>>>> the original input topic (this optimization is already available for >> the >>>>>> DSL, but not for the PAPI). >>>>>> >>>>>> If my observation is correct, we can simplify the motivation >> accordingly >>>>>> (the fact that you want to use this feature to share state across >>>> different >>>>>> applications more efficiently seems to be secondary and we could omit >> it >>>>>> IMHO to keep the motivation focused). >>>>>> >>>>>> As a result, we also don't need to concept of "leader" and "follower". >>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns >>>> across >>>>>> different apps, but we can only guarantee stuff within a single >>>> application >>>>>> (ie, don't create a changelog but reuse an input topic as changelog). >> It >>>>>> would simplify the KIP if we remove these parts. >>>>>> >>>>>> >>>>>> >>>>>> For the API, I am wondering why you propose to pass in >> `processorNames`? >>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead >>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor` >>>> must >>>>>> implement a certain pattern, ie, take each input record an apply it >>>>>> unmodified to the state store (ie, the Processor will be solely >>>> responsible >>>>>> to maintain the state store). We might also need to pass in other >>>> argument >>>>>> similar to `addGlobalStore` into this method). (More below.) >>>>>> >>>>>> If other processors need to read the state store, they can be >> connected >>>> to >>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid >>>>>> approach to keep `processorName` would also be possible, but IMHO all >>>> those >>>>>> should only _read_ the state store (but not modify it), to keep a >> clear >>>>>> conceptual separation. >>>>>> >>>>>> About the method name: wondering if we should use a different name to >> be >>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`? >>>>>> >>>>>> >>>>>> >>>>>> Btw: please omit any code snippets and only put the newly added method >>>>>> signature in the KIP. >>>>>> >>>>>> What I don't yet understand is the section "Allow state stores to >>>>>> continue listening for changes from their changelog". Can you >> elaborate? >>>>>> >>>>>> About: >>>>>> >>>>>>> Since a changelog topic is created with the application id in it’s >>>> name, >>>>>> it would allow us to check in the follower if the changelog topic >> starts >>>>>> with our application id. If it doesn’t, we are not allowed to send a >>>> log. >>>>>> >>>>>> The DSL implements this differently, and just disabled the changelog >> for >>>>>> the state store (ie, for the "follower"). We could do the same thing >>>>>> (either enforcing that the provided `StoreBuilder` has changelogging >>>>>> disabled, or by just ignoring it and disabled it hard coded). >>>>>> >>>>>> >>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put >>>>>> into rejected alternatives. Note that the problem you call out, namely >>>>>> >>>>>>> Problem with this approach is the lack of having restoring support >>>>>> within the state store >>>>>> >>>>>> does not apply. A restore it absolutely possible and the DSL already >>>>>> supports it. >>>>>> >>>>>> >>>>>> Or is your concern with regard to performance? The "source-processor >>>>>> approach" would have the disadvantage that input data is first >>>>>> deserialized, fed into the Processor, and than serialized again when >> put >>>>>> into the state store. Re-using the state restore code is a good idea >>>>>> from a performance point of view, but it might require quite some >>>>>> internal changes (your proposal to "not stop restoring" might not work >>>>>> as it could trigger quite some undesired side-effects given the >> current >>>>>> architecture of Kafka Streams). >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote: >>>>>>> Hey everyone, >>>>>>> >>>>>>> Just created a KIP on sharing statestore state across multiple >>>>>> applications without duplicating the data on multiple changelog >> topics. >>>>>> Have a look and tell me what you think or what to improve. This is my >>>> first >>>>>> one, so please be gentle 😉 >>>>>>> >>>>>>> https://cwiki.apache.org/confluence/x/q53kCw >>>>>>> >>>>>>> Cheers! >>>>>>> D. >>>>>> >>>>> >>>>> >>>> >>> >>> >> > >