Hi,

another idea that came to my mind. Instead of using a compacted topic, the buffer could use a non-compacted topic and regularly delete records before a given offset as Streams does for repartition topics.

Best,
Bruno

On 05.06.23 21:48, Bruno Cadonna wrote:
Hi Victoria,

that is a good point!

I think, the topic needs to be a compacted topic to be able to get rid of records that are evicted from the buffer. So the key might be something with the key, the timestamp, and a sequence number to distinguish between records with the same key and same timestamp.

Just an idea! Maybe Walker comes up with something better.

Best,
Bruno

On 05.06.23 20:38, Victoria Xia wrote:
Hi Walker,

Thanks for the latest updates! The KIP looks great. Just one question about
the changelog topic for the join buffer: The KIP says "When a failure
occurs the buffer will try to recover from an OffsetCheckpoint if possible.
If not it will reload the buffer from a compacted change-log topic." This
is a new changelog topic that will be introduced specifically for the join
buffer, right? Why is the changelog topic compacted? What are the keys? I
am confused because the buffer contains records from the stream-side of the
join, for which multiple records with the same key should be treated as
separate updates will all must be tracked in the buffer, rather than
updates which replace each other.

Thanks,
Victoria

On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Walker,

Thanks once more for the updates to the KIP!

Do you also plan to expose metrics for the buffer?

Best,
Bruno

On 02.06.23 17:16, Walker Carlson wrote:
Hello Bruno,

I think this covers your questions. Let me know what you think

2.
We can use a changelog topic. I think we can treat it like any other
store
and recover in the usual manner. Also implementation is on disk

3.
The description is in the public interfaces description. I will copy it
into the proposed changes as well.

This is a bit of an implementation detail that I didn't want to add into
the kip, but the record will be added to the buffer to keep the stream
time
consistent, it will just be ejected immediately. If of course if this
causes performance issues we will skip this step and track stream time
separately. I will update the kip to say that stream time advances when a
stream record enters the node.

Also, yes, updated.

5.
No there is no difference right now, everything gets processed as it
comes
in and tries to find a record for its time stamp.

Walker

On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Walker,

Thanks for the updates!

2.
It is still not clear to me how a failure is handled. I do not
understand what you mean by "recover from an OffsetCheckpoint".

My understanding is that the buffer needs to be replicated into its own
Kafka topic. The input topic is not enough. The offset of a record is
added to the offsets to commit once the record is streamed through the
subtopology. That means once the record is added to the buffer its
offset is added to the offsets to commit -- independently of whether the
record was evicted from the buffer and sent to the join node or not.
Now, let's assume the following scenario
1. a record is read from the input topic and added to the buffer, but
not evicted to be processed by the join node.
2. When the processing of the subtopology finishes the offset of the
record is added to the offsets to commit.
3. A commit happens.
4. A failure happens

After the failure the buffer is empty but the record will not be read
anymore from the input topic since its offset has been already
committed. The record is lost.
One solution to avoid the loss is to recreate the buffer from a
compacted Kafka topic as we do for suppression buffers. I do not think, we need any offset checkpoint here since, we keep the buffer in memory,
right? Or do you plan to back the buffer with a persistent store? Even
in that case, a compacted Kafka topic would be needed.


3.
   From the KIP it is still not clear to me what happens if a record is
outside of the grace period. I guess the record that falls outside of
the grace period will not be added to the buffer, but will be send to
the join node. Since it is outside of the grace period it will also not increase stream time and it will not trigger an eviction. Also the head
of the buffer will not contain a record that needs to be evicted since
the the timestamp of the head record will be within the interval stream time minus grace period. Is this correct? Please add such a description
to the KIP.
Furthermore, I think there is a mistake in the text:
"... will dequeue when the record timestamp is greater than stream time
plus the grace period". I guess that should be "... will dequeue when
the record timestamp is less than (or equal?) stream time minus the
grace period"


5.
What is the difference between not setting the grace period and setting
it to zero? If there is a difference, why is there a difference?


Best,
Bruno


On 01.06.23 23:58, Walker Carlson wrote:
Hey Bruno thanks for the feedback.

1)
I will add this to the kip, but stream time only advances as the when
the
buffer receives a new record.

2)
You are correct, I will add a failure section on to the kip. Since the
records wont change in the buffer from when they are read from the
topic
they are replicated already.

3)
I see that I'm out voted on the dropping of records thing. We will pass
them on and try to join them if possible. This might cause some null
results, but increasing the table history retention should help that.

4)
I can add some on the kip. But its pretty directly adding whatever the
grace period is to the latency. I don't see a way around it.

Walker

On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Walker,

thanks for the KIP!

Here my feedback:

1.
It is still not clear to me when stream time for the buffer advances.
What is the event that let the stream time advance? In the
discussion, I
do not understand what you mean by "The segment store already has an
observed stream time, we advance based on that. That should only
advance
based on records that enter the store." Where does this segment store come from? Anyways, I think it would be great to also state how stream
time advances in the KIP.

2.
How does the buffer behave in case of a failure? I think I understand
that the buffer will use an implementation of
TimeOrderedKeyValueBuffer
and therefore the records in the buffer will be replicated to a topic
in
Kafka, but I am not completely sure. Could you elaborate on this in
the
KIP?

3.
I agree with Matthias about dropping late records. We use grace
periods
in scenarios where we records are grouped like in windowed
aggregations
and windowed joins. The stream buffer you propose does not really
group
any records. It rather delays records and reorders them. I am not sure if grace period is the right naming/concept to apply here. Instead of
dropping records that fall outside of the buffer's time interval the
join should skip the buffer and try to join the record immediately. In
the end, a stream-table join is a unwindowed join, i.e., no grouping
is
applied to the records.
What do you and other folks think about this proposal?

4.
How does the proposed buffer, affects processing latency? Could you
please add some words about this to the KIP?


Best,
Bruno




On 31.05.23 01:49, Walker Carlson wrote:
Thanks for all the additional comments. I will either address them
here
or
update the kip accordingly.


I mentioned a follow kip to add extra features before and in the
responses.
I will try to briefly summarize what options and optimizations I plan
to
include. If a concern is not covered in this list I for sure talk
about
it
below.

* Allowing non versioned tables to still use the stream buffer
* Automatically materializing tables instead of forcing the user to
do
it
* Configurable for in memory buffer
* Order the records in offset order or in time order
* Non memory use buffer (offset order, delayed pull from stream.)
* Time synced between stream and table side (maybe)
* Do not drop late records and process them as they come in instead.


First, Victoria.

1) (One of your nits covers this, but you are correct it doesn't make
sense. so I removed that part of the example.)
For those examples with the "bad" join results I said without
buffering
the
stream it would look like that, but that was incomplete. If the look
up
was
simply looking at the latest version of the table when the stream
records
came in then the results were possible. If we are using the point in
time
lookup that versioned tables let us then you are correct the future
results
are not possible.

2) I'll get to this later as Matthias brought up something related.

To your additional thoughts, I agree that we need to call those
things
out
in the documentation. I'm writing up a follow up kip with a lot of
the
ideas we have discussed so that we can improve this feature beyond
the
base
implementation if it's needed.

I addressed the nits in the kip. I somehow missed the table stream
table
join processor improvement, it makes your first question make a lot
more
sense.  Table history retention is a much cleaner way to describe it.

As to your mention of the syncing the time for the table and stream. Matthias mentioned that as well. I will address both here. I plan to
bring
that up in the future, but for now we will leave it out. I suppose it
will
be more useful after the table history retention is separable from
the
table grace period.


To address Matthias comments.

You are correct by saying the in memory store shouldn't cause any
semantic
concerns. My concern would be more with if we limited the number of
records
on the buffer and what we would do if we hit said limits, (emitting
those
records might be an issue, throwing an error and halting would not).
I
think we can leave this discussion to the follow up kip along with a
few
other options.

I will go through your proposals now.

      - don't support non-versioned KTables

Sure, we can always expand this later on. Will include as part of the
of
the improvement kip

      - if grace period is added, users need to explicitly materialize
the
table as version (either directly, or upstream. Upstream only works
if
downstream tables "inherit" versioned semantics -- cf KIP-914)

again, that works for me for now, if we find a use we can always add
later.

      - the table's history retention time must be larger than the
grace
period (should be easy to check at runtime, when we build the
topology)

agreed

      - because switching from non-versioned to version stores is not
backward compatibly (cf KIP-914), users need to take care of this
themselves, and this also implies that adding grace period is not a
backward compatible change (even only if via indirect means)

sure, this works

As to the dropping of late records, I'm not sure. One one hand I like
not
dropping things. But on the other I struggle to see how a user can
filter
out late records that might have incomplete join results. The point
in
time
look up will aggressively expire old data and if new data has been
replaced
it will return null if outside of the retention. This seems like it
could
corrupt the integrity of the join output. Seeing that we drop late
records
on the table side as well I would think it makes sense to drop late
records
on the stream buffer. I could be convinced otherwise I suppose, I
could
see
adding this as an option in a follow up kip. It would be very easy to implement either way. For now unless no one else objects I'm going to
stick
with dropping the records for the sake of getting this kip passed. It
is
functionally a small change to make and we can update later if you
feel
strongly about it.

For the ordering. I have to say that it would be more complicated to implement it to be in offset order, if the goal it to get as many of
the
records validly joined as possible. Because we would process as
things
left
the buffer a sufficiency early enough record could hold up records
that
would otherwise be valid past the table history retention. To fix
this
we
could process by timestamp then store in a second queue and emit by
offset,
but that would be a lot more complicated. If we didn't care about not
missing some valid joins we could just have no store and pull from
the
topic at a delay only caring about the timestamp of the next offset.
For
now I want to stick with the timestamp ordering as it makes much more
sense
to me, but would propose we add both of the other options I have laid
out
here in the follow up kip.

Lastly, I think having an empty store with zero grace period would be
super
simple and not costly, so we might as well make it even if nothing
gets
entered.

I hope that address all your concerns,

Walker

On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax <mj...@apache.org>
wrote:

Walker,

thanks for the updates. The KIP itself reads fine (of course
Victoria
made good comments about some phrases), but there is a couple of
things
from your latest reply I don't understand, and that I still think
need
some more discussions.

Lukas, asked about in-memory option and `WindowStoreSupplier` and
you
mention "semantic concerns". There should not be any semantic
difference
from the underlying buffer implementation, so I am not sure what you
mean here (also the relationship to suppress() is unclear to me)?
-- I
am ok to not make it configurable for now. We can always do it via a
follow up KIP, and keep interface changes limited for now.

Does it really make sense to allow a grace period if the table is
non-versioned? You also say: "If table is not materialized it will
materialize it as versioned." -- What history retention time would
we
pick for this case (also asked by Victoria)? Or should we rather not support this and force the user to materialize the table explicitly,
and
thus explicitly picking a history retention time? It's tradeoff
between
usability and guiding uses that there will be a significant impact
on
disk usage. There is also compatibility concerns: If the table is
not
explicitly materialized in the old program, we would already need to
materialize it also in the old program (of course, we would use a
non-versioned store so far). Thus, if somebody adds a grace period,
we
cannot just switch the store type, as it would be a breaking change, potentially required an application re-set, or following the upgrade
path for versioned state stores, and also changing the program to
explicitly materialize using a versioned store. Also note, that we
might
not materialize the actual join table, but only an upstream table,
and
use `ValueGetter` to access the upstream data.

To this end, as you already mentioned, history retention of the
table
should be at least grace period. You proposed to include this in a
follow up KIP, but I am wondering if it's a fundamental requirement
and
thus we should put a check in place right away and reject an invalid
configuration? (It always easier to lift restriction than to
introduce
them later.) This would also imply that a non-versioned table cannot
be
supported, because it does not have a history retention that is
larger
than grace period, and maybe also answer the requirement about
materialization: as we already always materialize something on the
tablet side as non-versioned store right now, it seems difficult to
migrate the store to a versioned store. Ie, it might be ok to push
the
burden onto the user and say: if you start using grace period, you
also
need to manually switch from non-versioned to versioned KTables.
Doing
stuff automatically under the hood if very complex for this case, we
if
we push the burden onto the user, it might be ok to not complicate
this
KIP significantly.

To summarize the last two paragraphs, I would propose to:
      - don't support non-versioned KTables
      - if grace period is added, users need to explicitly
materialize
the
table as version (either directly, or upstream. Upstream only works
if
downstream tables "inherit" versioned semantics -- cf KIP-914)
      - the table's history retention time must be larger than the
grace
period (should be easy to check at runtime, when we build the
topology)
      - because switching from non-versioned to version stores is not
backward compatibly (cf KIP-914), users need to take care of this
themselves, and this also implies that adding grace period is not a
backward compatible change (even only if via indirect means)

About dropping late records: wondering if we should never drop a
stream-side record for a left-join, even if it's late? In general,
one
thing I observed over the years is, that it's easier to keep stuff
and
let users filter explicitly downstream (or make it configurable),
instead of dropping pro-actively, because users have no good way to
resurrect record that got already dropped.

For ordering, sounds reasonable to me only start with one
implementation, and maybe make it configurable as a follow up.
However,
I am wondering if starting with offset order might be the better
option
as it seems to align more with what we do so far? So instead of
storing
record ordered by timestamp, we can just store them ordered by
offset,
and still "poll" from the buffer based on the head records
timestamp.
Or
would this complicate the implementation significantly?

I also think it's ok to not "sync" stream-time between the table and
the
stream in this KIP, but we should consider doing this as a follow up
change (not sure if we would need a KIP or not for a change this
this).

About increasing/decreasing grace period: what you describe make
sense
to me. If decreased, the next record would just trigger emitting a
lot
of records, and for increase, the buffer would just need to "fill
up"
again. For reprocessing getting a different result with a different
grace period is expected, so that's ok IMHO. -- There seems to be
one
special corner case: grace period zero. For this case, we actually
don't
need any store, and the stream-side could be stateless. I think it
can
have the same behavior, but if we want to "add / remove" the store
dynamically, we need to add specific code for it. For example, even
if
we start up with a grace period of zero, we would need to check if
there
is a local store, and still emit everything in it, before we can
ditch
the store (not sure if that's even easily done at all). Or: we would
need to have a store for _all_ cases, even if grace period is zero
(the
store would be empty all the time though), to avoid super complex
code?


-Matthias


On 5/25/23 10:53 AM, Lucas Brutschy wrote:
Hi Walker,

thanks for your responses. That makes sense. I guess there is
always
the option to make the implementation more configurable later on,
if
users request it. Also thanks for the clarifications. From my side,
the KIP is good to go.

Cheers,
Lucas

On Wed, May 24, 2023 at 11:54 PM Victoria Xia
<victoria....@confluent.io.invalid> wrote:

Thanks for the updates, Walker! Looks great, though I do have a
couple
questions about the latest updates:

        1. The new example says that without stream-side buffering,
"ex"
and
        "fy" are possible join results. How could those join
results
happen? The
        example versioned table suggests that table record "x" has
timestamp 2, and
        table record "y" has timestamp 3. If stream record "e" has
timestamp 1,
        then it can never be joined against record "x", and
similarly
for
stream
        record "f" with timestamp 2 being joined against "y".
        2. I see in your replies above that "If table is not
materialized it
        will materialize it as versioned" but I don't see this
called
out
in the
        KIP -- seems worth calling out. Also, what will the history
retention for
        the versioned table be? Will it be the same as the join
grace
period, or
        will it be greater?


And some additional thoughts:

Sounds like there are a few things users should watch out for when
enabling
the stream-side buffer:

        - Records will get "stuck" if there are no newer records to
advance
        stream time.
        - If there are large gaps between the timestamps of
stream-side
records,
        then it's possible that versioned store history retention
will
have
expired
        by the time a record is evicted from the join buffer,
leading
to
a
join
        "miss." For example, if the join grace period and table
history
retention
        are both 10, and records come in the order:

        table side t0 with ts=0
        stream side s1 with ts=1 <-- enters buffer
        table side t10 with ts=10
        table side t20 with ts=20
        stream side s21 with ts=21 <-- evicts record s1 from
buffer,
but
        versioned store no longer contains data for ts=1 due to
history
retention
        having elapsed

        This will result in the join result (s1, null) even though
it
should've
        been (s1, t0), due to t0 having been expired from the
versioned
store
        already.
        - Out-of-order records from the stream-side will be
reordered,
and
late
        records will be dropped.

I don't think any of these are reasons to not go forward with this
KIP,
but
it'd be good to call them out in the eventual documentation to
decrease
the
chance users get tripped up.

We could maybe do an improvement later to advance stream time
from
table
side as well, but that might be debatable as we might get more
late
records.

Yes, the likelihood of late records increases but also the
likelihood
of
"join misses" due to versioned store history retention having
elapsed
decreases, which feels important for certain use cases. Either
way,
agreed
that it can be a discussion for the future as incorporating this
would
substantially complicate the implementation.

Also a couple nits:

        - The KIP currently says "We recently added versioned
tables
which
allow
        the table side of the a join [...] but it is not taken
advantage
of
in
        joins," but this doesn't seem true? If the table of a
stream-table
join is
        versioned, then the DSL's stream-table join processor will
automatically
        perform timestamped lookups into the table, in order to
take
advantage of
        the new timestamp-aware store to provide better join
semantics.
        - The KIP mentions "grace period" for versioned stores in a
number
of
        places but I think you actually mean "history retention"?
The
two
happen to
        be the same today (it is not an option for users to
configure
the
two
        separately) but this need not be true in the future.
"History
retention"
        governs how far back in time reads may occur, which is the
relevant
        parameter for performing lookups as part of the
stream-table
join.
"Grace
        period" in the context of versioned stores refers to how
far
back
in time
        out-of-order writes may occur, which probably isn't
directly
relevant for
        introducing a stream-side buffer, though it's also possible
I've
overlooked
        something. (As a bonus, switching from "table grace
period" in
the
KIP to
        "table history retention" also helps to clarify/distinguish
that
it's a
        different parameter from the "join grace period," which I
could
see
being
        confusing to readers. :) )


Cheers,
Victoria

On Thu, May 18, 2023 at 1:43 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hey all,

Thanks for the comments, they gave me a lot to think about. I'll
try
to
address them all inorder. I have made some updates to the kip
related
to
them, but I mention where below.

Lucas

Good idea about the example. I added a simple one.

1) I have thought about including options for the underlying
buffer
configuration. One of which might be adding an in memory option.
My
biggest
concern is about the semantic guarantees. This isn't like
suppress
or
with
windows where producing incomplete results is repetitively
harmless.
Here
we would be possibly producing incorrect results. I also would
like
to
keep
the interface changes as simple as I can. Making more than this
change
to
Joined I feel could make this more complicated than it needs to
be.
If
we
really want to I could see adding a grace() option with a
BufferConifg
in
there or something, but I would rather not.

2) The buffer will be independent of if the table is versioned or
not.
If
table is not materialized it will materialize it as versioned. It
might
make sense to do a follow up kip where we force the retention
period
of
the versioned to be greater than whatever the max of the stream
buffer
is.

Victoria

1) Yes, records will exit in timestamp order not in offset order.
2) Late records will be dropped (Late as out of the grace
period).
    From my
understanding that is the point of a grace period, no? Doesn't
the
same
thing happen with versioned stores?
3) The segment store already has an observed stream time, we
advance
based
on that. That should only advance based on records that enter the
store. So
yes, only stream side records. We could maybe do an improvement
later
to
advance stream time from table side as well, but that might be
debatable as
we might get more late records. Anyways I would rather have that
as a
separate discussion.

in memory option? We can do that, for the buffer I plan to use
the
TimeOrderedKeyValueBuffer interface which already has an in
memory
implantation, so it would be simple.

I said more in my answer to Lucas's question. The concern I have
with
buffer configs or in memory is complicating the interface. Also
semantic
guarantees but in memory shouldn't effect that

Matthias

1) fixed out of order vs late terminology in the kip.

2) I was referring to having a stream. So after this kip we can
have
a
buffered stream or a normal one. For the table we can use a
versioned
table
or a normal table.

3 Good call out. I clarified this as "If the table side uses a
materialized
version store, it can store multiple versions of each record
within
its
defined grace period." and modified the rest of the paragraph a
bit.

4) I get the preserving off offset ordering, but if the stream is
buffered
to join on timestamp instead of offset doesn't it already seem
like
we
care
more about time in this case?

If we end up adding more options it might make sense to do this.
Maybe
offset order processing can be a follow up?

I'll add a section for this in Rejected Alternatives. I think it
makes
sense to do something like this but maybe in a follow up.

5) I hadn't thought about this. I suppose if they changed this in
an
upgrade the next record would either evict a lot of records (if
the
grace
period decreased) or there would be a pause until the new grace
period
reached. Increasing is a bit more problematic, especially if the
table
grace period and retention time stays the same. If the data is
reprocessed
after a change like that then there would be different results,
but I
feel
like that would be expected after such a change.

What do you think should happen?

Hopefully this answers your questions!

Walker

On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <
mj...@apache.org>
wrote:

Thanks for the KIP! Also some question/comments from my side:

10) Notation: you use the term "late data" but I think you mean out-of-order. We reserve the term "late" to records that arrive
after
grace period passed, and thus, "late == out-of-order data that
is
dropped".


20) "There is only one option from the stream side and only
recently
is
there a second option on the table side."

What are those options? Victoria already asked about the table
side,
but
I am also not sure what option you mean for the stream side?


30) "If the table side uses a materialized version store the
value
is
the latest by stream time rather than by offset within its
defined
grace
period."

The phrase "the value is the latest by stream time" is confusing
--
in
the end, a versioned stores multiple versions, not just one.


40) I am also wondering about ordering. In general, KS tries to
preserve
offset-order during processing (with some exception, when offset
order
preservation is not clearly defined). Given that the stream-side
buffer
is really just a "linear buffer", we could easily preserve
offset-order.
But I also see a benefit of re-ordering and emitting
out-of-order
data
right away when read (instead of blocking them behind in-order
records
that are not ready yet). -- It might even be a possibility, to
let
users
pick a emit strategy eg "EmitStrategy.preserveOffsets" (name
just
a
placeholder).

The KIP should explain this in more detail and also discuss
different
options and mention them in "Rejected alternatives" in case we
don't
want to include them.


50) What happens when users change the grace period? Especially,
when
they turn it on/off (but also increasing/decreasing is an
interesting
point)? I think we should try to support this if possible; the
"Compatibility" section needs to cover switching on/off in more
detail.


-Matthias




On 5/2/23 2:06 PM, Victoria Xia wrote:
Cool KIP, Walker! Thanks for sharing this proposal.

A few clarifications:

1. Is the order that records exit the buffer in necessarily the
same
as
the
order that records enter the buffer in, or no? Based on the
description
in
the KIP, it sounds like the answer is no, i.e., records will
exit
the
buffer in increasing timestamp order, which means that they may
be
ordered
(even for the same key) compared to the input order.

2. What happens if the join grace period is nonzero, and a
stream-side
record arrives with a timestamp that is older than the current
stream
time
minus the grace period? Will this record trigger a join result,
or
will
it
be dropped? Based on the description for what happens when the
join
grace
period is set to zero, it sounds like the late record will be
dropped,
even
if the join grace period is nonzero. Is that true?

3. What could cause stream time to advance, for purposes of
removing
records from the join buffer? For example, will new records
arriving
on
the
table side of the join cause stream time to advance? From the
KIP
it
sounds
like only stream-side records will advance stream time -- does
that
mean
that the join processor itself will have to track this stream
time?

Also +1 to Lucas's question about what options will be
available
for
configuring the join buffer. Will users have the option to
choose
whether
they want the buffer to be in-memory vs persistent?

- Victoria

On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

HI Walker,

thanks for the KIP! We definitely need this. I have two
questions:

       - Have you considered allowing the customization of the
underlying
buffer implementation? As I can see, `StreamJoined` lets you
customize
the underlying store via a `WindowStoreSupplier`. Would it
make
sense
for `Joined` to have this as well? I can imagine one may want
to
limit
the number of records in the buffer, for example. If we hit
the
maximum, the only option would be to drop semantic guarantees,
but
users may still want to do this.
       - With "second option on the table side" you are
referring
to
versioned tables, right? Will the buffer on the stream side
behave
any
different whether the table side is versioned or not?

Finally, I think a simple example in the motivation section
could
help
non-experts understand the KIP.

Best,
Lucas

On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hello everybody,

I have a stream proposal to improve the stream table join by
adding a
grace
period and buffer to the stream side of the join to allow
processing
in
timestamp order matching the recent improvements of the
versioned
tables.

Please take a look here <
https://cwiki.apache.org/confluence/x/lAs0Dw>
and
share your thoughts.

best,
Walker













Reply via email to