Updated the KIP to be more aligned with global state store function names.

If I remember correctly during restore the processor will not be used right? I 
think this might provide issues if your processor is doing a projection of the 
data. Either way, I would not add that into this KIP since it is a specific 
use-case pattern.

Unless there is anything more to add or change, I would propose moving to a 
vote?

Cheers!
D.

From: Matthias J. Sax <mj...@apache.org>
Date: Friday, 18 February 2022 at 03:29
To: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP!

I am wondering if we would need two overloads of `addReadOnlyStateStore`
one w/ and one w/o `TimestampExtractor` argument to effectively make it
an "optional" parameter?

Also wondering if we need to pass in a `String sourceName` and `String
processorName` parameters (similar to `addGlobalStore()`?) instead if
re-using the store name as currently proposed? -- In general I don't
have a strong opinion either way, but it seems to introduce some API
inconsistency if we don't follow the `addGlobalStore()` pattern?


> Another thing we were confronted with was the restoring of state when the 
> actual local storage is gone. For example, we host on K8s with ephemeral 
> pods, so there is no persisted storage between pod restarts. However, the 
> consumer group will be already been at the latest offset, preventing from 
> previous data to be restored within the new pod’s statestore.

We have already code in-place in the runtime to do the right thing for
this case (ie, via DSL source-table changelog optimization). We can
re-use this part. It's nothing we need to discuss on the KIP, but we can
discuss on the PR later.


-Matthias


On 2/17/22 10:09 AM, Guozhang Wang wrote:
> Hi Daan,
>
> I think for the read-only state stores you'd need ot slightly augment the
> checkpointing logic so that it would still write the checkpointed offsets
> while restoring from the changelogs.
>
>
> Guozhang
>
> On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis <dger...@korfinancial.com>
> wrote:
>
>>> Could you add more details about the signature of
>>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>>> overloads taking different parameters? The KIP only contains some verbal
>>> description on the "Implementation Plan" section, that is hard to find
>>> and hard to read.
>>>
>>> The KIP mentions a `ProcessorProvider` -- do you mean
>> `ProcessorSupplier`?
>>>
>>> About timestamp synchronization: why do you propose to disable timestamp
>>> synchronization (similar to global state stores)? It seems to be an
>>> unnecessary limitation? -- Given that we could re-use the new method for
>>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>>> timestamp synchronization enabled seems to be important?
>>
>> Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
>> have allow for timestamp synchronization.
>>
>> Another thing we were confronted with was the restoring of state when the
>> actual local storage is gone. For example, we host on K8s with ephemeral
>> pods, so there is no persisted storage between pod restarts. However, the
>> consumer group will be already been at the latest offset, preventing from
>> previous data to be restored within the new pod’s statestore.
>>
>> If I remember correctly, there was some checkpoint logic available when
>> restoring, but we are bypassing that since logging is disabled on the
>> statestore, no?
>>
>> As always, thanks for your insights.
>>
>> Cheers,
>> D.
>>
>>
>> From: Matthias J. Sax <mj...@apache.org>
>> Date: Wednesday, 16 February 2022 at 02:09
>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>> Thanks for updating the KIP.
>>
>> Could you add more details about the signature of
>> `addReadOnlyStateStore()` -- What parameters does it take? Are there any
>> overloads taking different parameters? The KIP only contains some verbal
>> description on the "Implementation Plan" section, that is hard to find
>> and hard to read.
>>
>> The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?
>>
>> About timestamp synchronization: why do you propose to disable timestamp
>> synchronization (similar to global state stores)? It seems to be an
>> unnecessary limitation? -- Given that we could re-use the new method for
>> source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
>> timestamp synchronization enabled seems to be important?
>>
>>
>> -Matthias
>>
>>
>> On 2/8/22 11:01 PM, Guozhang Wang wrote:
>>> Daan,
>>>
>>> Thanks for the replies, those make sense to me.
>>>
>>> On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis <dger...@korfinancial.com>
>> wrote:
>>>
>>>> I just updated the KIP to reflect the things discussed in this thread.
>>>>
>>>> As for your questions Guozhang:
>>>>
>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>> different from the num.tasks of app B's sub-topology with that
>> read-only
>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>> store
>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>
>>>> Good question. Both need to be co-partitioned to have the data
>> available.
>>>> Another option would be to use IQ to make the request, but that seems
>> far
>>>> from ideal.
>>>>
>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>> app
>>>>> B's processing timelines, or just like what we do for global stores
>> that
>>>> we
>>>>> just update the read-only stores async?
>>>>
>>>> Pretty much the same as we do for global stores.
>>>>
>>>>> 3) If the answer to both of the above questions are the latter, then
>>>> what's
>>>>> the main difference of adding a read-only store v.s. adding a global
>>>> store?
>>>>
>>>> I think because of the first answer the behavior differs from global
>>>> stores.
>>>>
>>>> Makes sense?
>>>>
>>>> Cheers,
>>>>
>>>> D.
>>>>
>>>> From: Matthias J. Sax <mj...@apache.org>
>>>> Date: Thursday, 20 January 2022 at 21:12
>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>> Any processor that would use that materialized, read-only statestore
>>>> would need to wait for the store to be restored. I can't find a way to
>> make
>>>> that possible since processors can't wait for the statestore to be
>> restored.
>>>>
>>>> This is built into the runtime already. Nothing to worry about. It's
>>>> part of the regular restore logic -- as long as any store is restoring,
>>>> all processing is blocked.
>>>>
>>>>> Also, since the statestore would have logging disabled, it means there
>>>> is no initial restoration going on.
>>>>
>>>> No. When we hookup the input topic as changelog (as the DSL does) we
>>>> restore from the input topic during regular restore phase. The restore
>>>> logic does not even know it's reading from the input topic, but not from
>>>> a "*-changelog" topic).
>>>>
>>>> Disabling changelogging does only affect the write path (ie,
>>>> `store.put()`) but not the restore path due to the internal "hookup" of
>>>> the input topic inside the restore logic.
>>>>
>>>> It's not easy to find/understand by reverse engineering I guess, but
>>>> it's there.
>>>>
>>>> One pointer where the actual hookup happens (might help to dig into it
>>>> more if you want):
>>>>
>>>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L353-L356
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 1/20/22 10:04 AM, Guozhang Wang wrote:
>>>>> Hello Daan,
>>>>>
>>>>> Thanks for writing the KIP. I just read through it and just my 2c here:
>>>> to
>>>>> me it seems that one of the goal would be to "externalize" the internal
>>>>> changelog topic of an application (say A) so that other consumers can
>>>>> directly read them --- though technically without any auth, anyone
>>>> knowing
>>>>> the topic name would be able to write to it too, conceptually we would
>>>> just
>>>>> assume that app A is the only writer of that topic --- The question I
>> had
>>>>> is how much we want to externalize the topic. For example we can,
>>>>> orthogonally to this KIP, just allow users to pass in a customized
>> topic
>>>>> name when constructing a state store, indicating the application A to
>> use
>>>>> that as the changelog, and since that topic is created outside of A and
>>>> is
>>>>> publicly visible to anyone else on that cluster, anyone --- including
>> any
>>>>> consumers, or streams apps. This is probably most flexible as for
>>>> sharing,
>>>>> but we are even less assured that if application A is the only writer
>> to
>>>>> that external topic unless we have explicit auth for A on that topic.
>>>>>
>>>>> Aside of that, here are a few more detailed comments about the
>>>>> implementation design itself following your current proposal:
>>>>>
>>>>> 1) How do we handle if the num.partitions of app A's store changelog is
>>>>> different from the num.tasks of app B's sub-topology with that
>> read-only
>>>>> store? Or are we going to let each task of B keep a whole copy of the
>>>> store
>>>>> of A by reading all of its changelog partitions, like global stores?
>>>>> 2) Are we trying to synchronize the store updates from the changelog to
>>>> app
>>>>> B's processing timelines, or just like what we do for global stores
>> that
>>>> we
>>>>> just update the read-only stores async?
>>>>> 3) If the answer to both of the above questions are the latter, then
>>>> what's
>>>>> the main difference of adding a read-only store v.s. adding a global
>>>> store?
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 20, 2022 at 6:27 AM Daan Gertis <dger...@korfinancial.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Matthias,
>>>>>>
>>>>>> Thank you for that feedback, certainly some things to think about. Let
>>>> me
>>>>>> add my thoughts:
>>>>>>
>>>>>> +1 on simplifying the motivation. Was aiming to add more context but I
>>>>>> think you're right, bringing it back to the essence makes more sense.
>>>>>>
>>>>>> I also follow the reasoning of not having leader and follower. Makes
>>>> sense
>>>>>> to view it from a single app point of view.
>>>>>>
>>>>>> As for the API method and its parameters, I wanted to stay close to
>> the
>>>>>> API for adding a regular statestore, but I can perfectly find myself
>> in
>>>>>> defining an addReadOnlyStateStore() method instead.
>>>>>>
>>>>>> I agree the processor approach would be the most flexible one, and
>>>> surely
>>>>>> it allows you to use a processor to base the statestore off an
>> existing
>>>>>> topic. From what I understood from the codebase, there might be a
>>>> problem
>>>>>> when using that statestore. Any processor that would use that
>>>> materialized,
>>>>>> read-only statestore would need to wait for the store to be restored.
>> I
>>>>>> can't find a way to make that possible since processors can't wait for
>>>> the
>>>>>> statestore to be restored. Also, since the statestore would have
>> logging
>>>>>> disabled, it means there is no initial restoration going on. As you
>>>> wrote,
>>>>>> the DSL is already doing this, so I'm pretty sure I'm missing
>> something,
>>>>>> just unable to find what exactly.
>>>>>>
>>>>>> I will rewrite the parts in the KIP to make processor-based the
>>>> preferred
>>>>>> choice, along with the changes to the motivation etc. Only thing to
>>>> figure
>>>>>> out is that restoring behavior to be sure processors of the readonly
>>>>>> statestore aren't working with stale data.
>>>>>>
>>>>>> D.
>>>>>>
>>>>>> -----Original Message-----
>>>>>> From: Matthias J. Sax <mj...@apache.org>
>>>>>> Sent: 19 January 2022 21:31
>>>>>> To: dev@kafka.apache.org
>>>>>> Subject: Re: [DISCUSS] KIP-813 Shared State Stores
>>>>>>
>>>>>> Daan,
>>>>>>
>>>>>> thanks for the KIP. I personally find the motivation section a little
>>>> bit
>>>>>> confusing. If I understand the KIP correctly, you want to read a topic
>>>> into
>>>>>> a state store (ie, materialize it). This is already possible today.
>>>>>>
>>>>>> Of course, today a "second" changelog topic would be created. It seems
>>>> the
>>>>>> KIP aims to avoid the additional changelog topic, and to allow to
>> re-use
>>>>>> the original input topic (this optimization is already available for
>> the
>>>>>> DSL, but not for the PAPI).
>>>>>>
>>>>>> If my observation is correct, we can simplify the motivation
>> accordingly
>>>>>> (the fact that you want to use this feature to share state across
>>>> different
>>>>>> applications more efficiently seems to be secondary and we could omit
>> it
>>>>>> IMHO to keep the motivation focused).
>>>>>>
>>>>>> As a result, we also don't need to concept of "leader" and "follower".
>>>>>> In the end, Kafka Streams cannot reason/enforce any usage patterns
>>>> across
>>>>>> different apps, but we can only guarantee stuff within a single
>>>> application
>>>>>> (ie, don't create a changelog but reuse an input topic as changelog).
>> It
>>>>>> would simplify the KIP if we remove these parts.
>>>>>>
>>>>>>
>>>>>>
>>>>>> For the API, I am wondering why you propose to pass in
>> `processorNames`?
>>>>>> To me, it seems more reasonable to pass a `ProcessorSupplier` instead
>>>>>> (similar to what we do for `addGlobalStore`)? The provided `Processor`
>>>> must
>>>>>> implement a certain pattern, ie, take each input record an apply it
>>>>>> unmodified to the state store (ie, the Processor will be solely
>>>> responsible
>>>>>> to maintain the state store). We might also need to pass in other
>>>> argument
>>>>>> similar to `addGlobalStore` into this method). (More below.)
>>>>>>
>>>>>> If other processors need to read the state store, they can be
>> connected
>>>> to
>>>>>> it explicitly via `connectProcessorAndStateStores()`? I guess a hybrid
>>>>>> approach to keep `processorName` would also be possible, but IMHO all
>>>> those
>>>>>> should only _read_ the state store (but not modify it), to keep a
>> clear
>>>>>> conceptual separation.
>>>>>>
>>>>>> About the method name: wondering if we should use a different name to
>> be
>>>>>> more explicit what the method does? Maybe `addReadOnlyStateStore`?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Btw: please omit any code snippets and only put the newly added method
>>>>>> signature in the KIP.
>>>>>>
>>>>>> What I don't yet understand is the section "Allow state stores to
>>>>>> continue listening for changes from their changelog". Can you
>> elaborate?
>>>>>>
>>>>>> About:
>>>>>>
>>>>>>> Since a changelog topic is created with the application id in it’s
>>>> name,
>>>>>> it would allow us to check in the follower if the changelog topic
>> starts
>>>>>> with our application id. If it doesn’t, we are not allowed to send a
>>>> log.
>>>>>>
>>>>>> The DSL implements this differently, and just disabled the changelog
>> for
>>>>>> the state store (ie, for the "follower"). We could do the same thing
>>>>>> (either enforcing that the provided `StoreBuilder` has changelogging
>>>>>> disabled, or by just ignoring it and disabled it hard coded).
>>>>>>
>>>>>>
>>>>>> Ie, overall I would prefer the "source-procssor appraoch" that you put
>>>>>> into rejected alternatives. Note that the problem you call out, namely
>>>>>>
>>>>>>> Problem with this approach is the lack of having restoring support
>>>>>> within the state store
>>>>>>
>>>>>> does not apply. A restore it absolutely possible and the DSL already
>>>>>> supports it.
>>>>>>
>>>>>>
>>>>>> Or is your concern with regard to performance? The "source-processor
>>>>>> approach" would have the disadvantage that input data is first
>>>>>> deserialized, fed into the Processor, and than serialized again when
>> put
>>>>>> into the state store. Re-using the state restore code is a good idea
>>>>>> from a performance point of view, but it might require quite some
>>>>>> internal changes (your proposal to "not stop restoring" might not work
>>>>>> as it could trigger quite some undesired side-effects given the
>> current
>>>>>> architecture of Kafka Streams).
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 1/16/22 11:52 PM, Daan Gertis wrote:
>>>>>>> Hey everyone,
>>>>>>>
>>>>>>> Just created a KIP on sharing statestore state across multiple
>>>>>> applications without duplicating the data on multiple changelog
>> topics.
>>>>>> Have a look and tell me what you think or what to improve. This is my
>>>> first
>>>>>> one, so please be gentle 😉
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/x/q53kCw
>>>>>>>
>>>>>>> Cheers!
>>>>>>> D.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>
>

Reply via email to