Hi Walker,
Thanks for the KIP!
Great that you are going to fix this long-standing issue!
1.
I was wondering if we need the timestamp extractor as well as the key
and value deserializer in Topology#addGlobalStore() that do not take a
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any
records, do they still need to deserialize records and extract
timestamps? Name might still be needed, right?
2.
From an API point of view, it might make sense to put all
processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.
3.
Could you please go over the KIP and correct typos and other mistakes in
the KIP?
Best,
Bruno
On 3/2/24 1:43 AM, Matthias J. Sax wrote:
Thanks for the KIP Walker.
Fixing this issue, and providing users some flexibility to opt-in/out on
"restore reprocessing" is certainly a good improvement.
From an API design POV, I like the idea to not require passing in a
ProcessorSupplier to begin with. Given the current implementation of the
restore process, this might have been the better API from the beginning
on... Well, better late than never :)
For this new method w/o a supplier, I am wondering if we want to keep
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar
thing via KIP-813. Just an idea.
However, I am not convinced that adding a new boolean parameter is the
best way to design the API. Unfortunately, I don't have any elegant
proposal myself. Just a somewhat crazy idea to do a larger API change:
Making a step back, a global store, is by definition a terminal node --
we don't support to add child nodes. Hence, while we expose a full
`ProcessorContext` interface, we actually limit what functionality it
supports. Thus, I am wondering if we should stop using the generic
`Processor` interface to begin with, but design a new one which is
tailored to the needs of global stores? -- This would of course be of
much larger scope than originally intended by this KIP, but it might be
a great opportunity to kill two birds with one stone?
The only other question to consider is: do we believe that global stores
will never have child nodes, or could we actually allow for child nodes
in the future? If yes, it might not be smart to move off using
`Processor` interface.... In general, I could imagine, especially as we
now want to support "process on restore" to allow simple stateless
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add
custom global processors) at some point in the future?
Just wanted to put this out to see what people think...
-Matthias
On 2/29/24 1:26 PM, Walker Carlson wrote:
Hello everybody,
I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.
Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ
Thanks,
Walker