Hey Matthias,

Thank you for that feedback, certainly some things to think about. Let me add 
my thoughts:

+1 on simplifying the motivation. Was aiming to add more context but I think 
you're right, bringing it back to the essence makes more sense.

I also follow the reasoning of not having leader and follower. Makes sense to 
view it from a single app point of view.

As for the API method and its parameters, I wanted to stay close to the API for 
adding a regular statestore, but I can perfectly find myself in defining an 
addReadOnlyStateStore() method instead.

I agree the processor approach would be the most flexible one, and surely it 
allows you to use a processor to base the statestore off an existing topic. 
From what I understood from the codebase, there might be a problem when using 
that statestore. Any processor that would use that materialized, read-only 
statestore would need to wait for the store to be restored. I can't find a way 
to make that possible since processors can't wait for the statestore to be 
restored. Also, since the statestore would have logging disabled, it means 
there is no initial restoration going on. As you wrote, the DSL is already 
doing this, so I'm pretty sure I'm missing something, just unable to find what 
exactly.

I will rewrite the parts in the KIP to make processor-based the preferred 
choice, along with the changes to the motivation etc. Only thing to figure out 
is that restoring behavior to be sure processors of the readonly statestore 
aren't working with stale data.

D.

-----Original Message-----
From: Matthias J. Sax <mj...@apache.org> 
Sent: 19 January 2022 21:31
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-813 Shared State Stores

Daan,

thanks for the KIP. I personally find the motivation section a little bit 
confusing. If I understand the KIP correctly, you want to read a topic into a 
state store (ie, materialize it). This is already possible today.

Of course, today a "second" changelog topic would be created. It seems the KIP 
aims to avoid the additional changelog topic, and to allow to re-use the 
original input topic (this optimization is already available for the DSL, but 
not for the PAPI).

If my observation is correct, we can simplify the motivation accordingly (the 
fact that you want to use this feature to share state across different 
applications more efficiently seems to be secondary and we could omit it IMHO 
to keep the motivation focused).

As a result, we also don't need to concept of "leader" and "follower". 
In the end, Kafka Streams cannot reason/enforce any usage patterns across 
different apps, but we can only guarantee stuff within a single application 
(ie, don't create a changelog but reuse an input topic as changelog). It would 
simplify the KIP if we remove these parts.



For the API, I am wondering why you propose to pass in `processorNames`? 
To me, it seems more reasonable to pass a `ProcessorSupplier` instead (similar 
to what we do for `addGlobalStore`)? The provided `Processor` must implement a 
certain pattern, ie, take each input record an apply it unmodified to the state 
store (ie, the Processor will be solely responsible to maintain the state 
store). We might also need to pass in other argument similar to 
`addGlobalStore` into this method). (More below.)

If other processors need to read the state store, they can be connected to it 
explicitly via `connectProcessorAndStateStores()`? I guess a hybrid approach to 
keep `processorName` would also be possible, but IMHO all those should only 
_read_ the state store (but not modify it), to keep a clear conceptual 
separation.

About the method name: wondering if we should use a different name to be more 
explicit what the method does? Maybe `addReadOnlyStateStore`?



Btw: please omit any code snippets and only put the newly added method 
signature in the KIP.

What I don't yet understand is the section "Allow state stores to 
continue listening for changes from their changelog". Can you elaborate?

About:

> Since a changelog topic is created with the application id in it’s name, it 
> would allow us to check in the follower if the changelog topic starts with 
> our application id. If it doesn’t, we are not allowed to send a log.

The DSL implements this differently, and just disabled the changelog for 
the state store (ie, for the "follower"). We could do the same thing 
(either enforcing that the provided `StoreBuilder` has changelogging 
disabled, or by just ignoring it and disabled it hard coded).


Ie, overall I would prefer the "source-procssor appraoch" that you put 
into rejected alternatives. Note that the problem you call out, namely

> Problem with this approach is the lack of having restoring support within the 
> state store

does not apply. A restore it absolutely possible and the DSL already 
supports it.


Or is your concern with regard to performance? The "source-processor 
approach" would have the disadvantage that input data is first 
deserialized, fed into the Processor, and than serialized again when put 
into the state store. Re-using the state restore code is a good idea 
from a performance point of view, but it might require quite some 
internal changes (your proposal to "not stop restoring" might not work 
as it could trigger quite some undesired side-effects given the current 
architecture of Kafka Streams).


-Matthias




On 1/16/22 11:52 PM, Daan Gertis wrote:
> Hey everyone,
> 
> Just created a KIP on sharing statestore state across multiple applications 
> without duplicating the data on multiple changelog topics. Have a look and 
> tell me what you think or what to improve. This is my first one, so please be 
> gentle 😉
> 
> https://cwiki.apache.org/confluence/x/q53kCw
> 
> Cheers!
> D.

Reply via email to