Thanks Victoria.

1) I have no concerns about the filter operator's proposed semantics.

2) For aggregations, I have meta question in mind to discuss first,
which is for those operators that generate a table, which is
materialized as versioned, how we should emit out of order data if the
operator still needs to. A quick look at the following operations
today:

a. Read topic as a table source
b. KStream#toTable
c. Aggregated from a stream
d. Aggregated from a table
e. From a table-table join.
f. From a stateless table operator like a `table.filter`.

Per this KIP, case d) and e) should not emit out-of-order records to
the downstream any more, so we only need to consider the others: today
when we send out the old/new pairs downstream, the old value is just
the value read right before being overwritten from the materialized
store. If the store is versioned, however, then this old value is
already been sent before as part of the new/old pair, so it's actually
correct to just indicate the old/new pair as just the out-of-order
record itself? More specifically: say given a table source operator,
with the topic's incoming records for the same key:

(A, t10), (B, t20), (C, t15)

If the store is not versioned, we would emit, in the form of old/new:

A10/null, B20/A10, C15/B20

While if there's no such out-of-ordering, the ideal emit ordering should be:

A10/null, C15/A10, B20/C15

So I'm thinking, if the store is versioned, we should try to emit in a
way that is as coherent with the ideal ordering as possible, for the
downstream operators to handle:

A10/null, B20/A10, C15/null, null/C15 (or to be succinct, just
A10/null, B20/A10, C15/C15)

This is because, the A10 is already sent as part of the B20/A10 before
C comes, in order for downstream operators to negate its effect; so
when C comes, we only need to let the downstream know that "there was
a C coming at t15 between A at 10 and B at 20, which is already
obsoleted because of the later B20 that I sent you before".

This gives the underlying operator the correct information, which can
handle it accordingly:

* For aggregate operators, it can simply ignore the C15/C15 from upstream.
* For stateless operators, it just apply the filter still on C15/C15
and forward downwards.
* For join operators, as this KIP indicated, it would apply the join
if necessary and not emit the older join results.

If we can do that, then maybe we do not even need to change the
repartition topic format again?


Guozhang

On Tue, Apr 11, 2023 at 11:17 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> If we send old and new value as two messages, this should work I guess?
> Victory could confirm. -- But not if we send old/new as a single message
> in case the new-key does not change?
>
> -Matthias
>
> On 4/11/23 5:25 AM, Lucas Brutschy wrote:
> > Hi,
> >
> > No concerns at all, just a clarifying question from my side: for
> > detecting out-of-order records, I need both new and old timestamp, I
> > suppose I get it for the new record via timestamp extractor, can I not
> > get it the same way from the old record that is passed down to the
> > aggregation after KIP-904?
> >
> > Thanks,
> > Lucas
> >
> > On Tue, Apr 11, 2023 at 5:35 AM Matthias J. Sax <mj...@apache.org> wrote:
> >>
> >> Thanks.
> >>
> >> One question: for the repartition topic format change, do we want to
> >> re-use flag=2, or should we introduce flag=3, and determine when
> >> compiling the DSL into the Topology if we want/need to include the
> >> timestamp, and if not, use format version=2 to avoid unnecessary overhead?
> >>
> >>
> >> -Matthias
> >>
> >> On 4/10/23 5:47 PM, Victoria Xia wrote:
> >>> Hi everyone,
> >>>
> >>> While wrapping up the implementation for KIP-914, I have discovered that
> >>> two more DSL processors require semantic updates in the presence of
> >>> versioned tables:
> >>>
> >>>      - The table filter processor has an optimization to drop nulls if the
> >>>      previous filtered value is also null. When the upstream table is 
> >>> versioned,
> >>>      this optimization should be disabled in order to preserve proper 
> >>> version
> >>>      history in the presence of out-of-order data.
> >>>      - When performing an aggregation over a versioned table, only the 
> >>> latest
> >>>      value by timestamp (per key) should be included in the final 
> >>> aggregate
> >>>      value. This is not happening today in the presence of out-of-order 
> >>> data,
> >>>      due to the way that TableSourceNodes call `get(key)` in order to 
> >>> determine
> >>>      the "old value" which is to be removed from the aggregate as part of
> >>>      applying an update. To fix this, aggregations should ignore 
> >>> out-of-order
> >>>      records when aggregating versioned tables.
> >>>         - In order to implement this change, table aggregate processors 
> >>> need
> >>>         a way to determine whether a record is out-of-order or not. This
> >>> cannot be
> >>>         done by querying the source table value getter as that store 
> >>> belongs to a
> >>>         different subtopology (because a repartition occurs before
> >>> aggregation). As
> >>>         such, an additional timestamp must be included in the repartition 
> >>> topic.
> >>>         The 3.5 release already includes an update to the repartition
> >>> topic format
> >>>         (with upgrade implications properly handled) via KIP-904
> >>>         
> >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-904%3A+Kafka+Streams+-+Guarantee+subtractor+is+called+before+adder+if+key+has+not+changed>,
> >>>         so making an additional change to the repartition topic format to 
> >>> add a
> >>>         timestamp comes at no additional cost to users.
> >>>
> >>>
> >>> I have updated the KIP
> >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores>
> >>> itself with more detail about each of these changes. Please let me know if
> >>> there are any concerns. In the absence of dissent, I'd like to include
> >>> these changes along with the rest of KIP-914 in the 3.5 release.
> >>>
> >>> Apologies for not noticing these additional semantics implications 
> >>> earlier,
> >>> Victoria
> >>>
> >>> ---------- Forwarded message ---------
> >>> From: Victoria Xia <victoria....@confluent.io>
> >>> Date: Wed, Mar 22, 2023 at 10:08 AM
> >>> Subject: Re: [VOTE] KIP-914 Join Processor Semantics for Versioned Stores
> >>> To: <dev@kafka.apache.org>
> >>>
> >>>
> >>> Thanks for voting, everyone! We have three binding yes votes with no
> >>> objections during four full days of voting. I will close the vote and mark
> >>> the KIP as accepted, right in time for the 3.5 release.
> >>>
> >>> Thanks,
> >>> Victoria
> >>>
> >>> On Wed, Mar 22, 2023 at 7:11 AM Bruno Cadonna <cado...@apache.org> wrote:
> >>>
> >>>> +1 (binding)
> >>>>
> >>>> Thanks Victoria!
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 20.03.23 17:13, Matthias J. Sax wrote:
> >>>>> +1 (binding)
> >>>>>
> >>>>> On 3/20/23 9:05 AM, Guozhang Wang wrote:
> >>>>>> +1, thank you Victoria!
> >>>>>>
> >>>>>> On Sat, Mar 18, 2023 at 8:27 AM Victoria Xia
> >>>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I'd like to start a vote on KIP-914 for updating the Kafka Streams 
> >>>>>>> join
> >>>>>>> processors to use proper timestamp-based semantics in applications 
> >>>>>>> with
> >>>>>>> versioned stores:
> >>>>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores
> >>>>>>>
> >>>>>>> To avoid compatibility concerns, I'd like to include the changes from
> >>>>>>> this
> >>>>>>> KIP together with KIP-889
> >>>>>>> <
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> >>>>>
> >>>>>>> (for introducing versioned stores) in the upcoming 3.5 release. I will
> >>>>>>> close the vote on the 3.5 KIP deadline, March 22, if there are no
> >>>>>>> objections before then.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Victoria
> >>>>
> >>>

Reply via email to