Hey,
looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?
StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.
2.
I do not understand what you mean with "maximum flexibility". The
built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that
interface. If
they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile time.
Let me know whether I misunderstood something.
Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to always
require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we always
need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.
@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this KIP,
the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.
Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the end, we need to skip records during restore, and it
seems it does not make sense to make this configurable?
-Matthias
On 3/26/24 4:24 PM, Almog Gavra wrote:
Thanks for the thoughts Bruno!
Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?
Yes, this is exactly the type of thing I was musing (but I don't
have any
concrete suggestions). It feels like that would give the flexibility to
do
things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.
I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now
and
let the discussion continue :)
Cheers,
Almog
On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna <cado...@apache.org>
wrote:
Hi Almog,
Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?
Do you already have an idea?
The proposal in the KIP is focused on the processor that updates the
global state whereas in the case of GlobalKTable and source KTable the
issues lies in the deserialization of records from the input
topics, but
only if the deserialization error handler is configured to drop the
problematic record. Additionally, for source KTable the source topic
optimization must be turned on to run into the issue. I am
wondering how
a unified API for global stores, GlobalKTable, and source KTable might
look like.
While it is an interesting question, I am in favor of deferring
this to
a separate KIP.
Best,
Bruno
On 3/26/24 12:49 AM, Almog Gavra wrote:
Hello Folk!
Glad to see improvements to the GlobalKTables in discussion! I think
they
deserve more love :)
Scope creep alert (which I'm generally against and certainly still
support
this KIP without but I want to see if there's an elegant way to
address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier"
which
I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the
source-changelog
optimization. Since this API could be used to signal "I want to
reprocess
on restore" I'm wondering whether it makes sense to design this
API in
a
way that could be extended for KTables as well so a fix for
KAFKA-8037
would be possible with the same mechanism. Thoughts?
Cheers,
Almog
On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:
Hey Bruno,
1) I'm actually not sure why that is in there. It certainly doesn't
match
the convention. Best to remove it and match the other methods.
2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined
processors but
then
they might as well use the `globalTable` method. I think the add
state
store option should go for maximum flexibility.
Best,
Walker
On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna <cado...@apache.org>
wrote:
Hi Walker,
A couple of follow-up questions.
1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not
use
Named for the name of the source node?
2.
Did you consider Matthias' proposal to restrict the type of the
store
builder to `StoreBuilder<TimestampedKeyValueStore>` (or even
`StoreBuilder<? extends TimestampedKeyValueStore>`) for the case
where
the processor is built-in?
Best,
Bruno
On 3/13/24 11:05 PM, Walker Carlson wrote:
Thanks for the feedback Bruno, Matthias, and Lucas!
There is a decent amount but I'm going to try and just hit the
major
points
as I would like to keep this change simple.
I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.
The main sticking point seems to be with the method of signalling
the
restore behavior. It seems we can all agree with how the API
should
look
with the default option we are adding. I think keeping the
option to
load
directly from the topic into the store is a good idea. It is much
more
performant and could make a simple metric collector processor much
simpler.
I think something that Matthais said about creating a special
class
of
processors for the global stores helps me think about the issue. I
tend
to
fall into the category that we should keep global stores open
to the
possibility of having child nodes in the future. I don't really
see
the
downside of having that as an option. It might not be best for
a lot
of
cases, but something simple could be very useful to put in the
PAPI.
I like the idea of having a `GlobalStoreParameters` but only if we
decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.
As of right now I don't see a better option than having a boolean
flag
for
the reprocessOnRestore option. I expanded the description in the
docs
so
I
hope that helps.
I am more than willing to take other ideas on it.
thanks,
Walker