Thank you for the constructive discussions! I'll start a vote in a separate
thread.

On Mon, 12 Jan 2026 at 13:34, Timo Walther <[email protected]> wrote:

> Thanks for the clarification, Dawid.
>
> +1 for voting.
>
> Cheers,
> Timo
> On 12.01.26 03:09, Xuyang wrote:
> > Thank you for the explanation. big +1 for this.
> >
> >
> >
> > --
> >
> >      Best!
> >      Xuyang
> >
> >
> >
> > At 2026-01-09 19:16:52, "Dawid Wysakowicz" <[email protected]>
> wrote:
> >> @Timo
> >>
> >> 1) Will we support ON CONFLICT syntax also for append-only inputs?
> >>
> >>
> >> That's a very good point! I think we should. In my opinion we should
> >> support it so that:
> >> 1. ERROR -> checks if we don't insert multiple times to the same
> PRIMARY KEY
> >> 2. NOTHING -> that emits only the first value that's inserted
> >> 3. DEDUPLICATE -> the current behaviour, we would always overwrite the
> >> value. we don't have a disorder problem though thus we don't need the
> SUM
> >> operator
> >>
> >> 2) Regarding naming, I find the config option is too long.
> >>> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"
> >>> could we simplify it to something less internal? Maybe
> >>> table.exec.sink.upserts.compaction-interval?
> >>
> >>
> >> Fine by me. I'll update the FLIP to:
> >> "table.exec.sink.upserts.compaction-interval"
> >>
> >> @David
> >> Thank you for the kind words! I'll think of having it as a blogpost.
> >>
> >> If there are no further objections/comments. I'd like to start a vote on
> >> this some time next week.
> >>
> >> On Thu, 8 Jan 2026 at 13:48, Timo Walther <[email protected]> wrote:
> >>
> >>> Hi Dawid,
> >>>
> >>> thank you very much for working and proposing this FLIP. This is an
> >>> excellent design document that shows the deep research you have
> >>> conducted. Both the linked resources as well as the examples are very
> >>> helpful to get the big picture.
> >>>
> >>> Sink upsert materializer is a long-standing problem for Flink SQL
> >>> pipelines. Only a few people really understand why it exists and why it
> >>> is so expensive. I also know that some users have simply disabled it
> >>> because they are fine with the output results (potentially ignoring
> >>> corner cases intentially or by accident).
> >>>
> >>> In general the FLIP is in a very good shape, and you definitely get my
> >>> +1 on this. Just some last questions:
> >>>
> >>> 1) Will we support ON CONFLICT syntax also for append-only inputs?
> >>>
> >>> 2) Regarding naming, I find the config option is too long.
> >>> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"
> >>> could we simplify it to something less internal? Maybe
> >>> table.exec.sink.upserts.compaction-interval?
> >>>
> >>> Cheers,
> >>> Timo
> >>>
> >>>
> >>> On 08.01.26 12:42, Dawid Wysakowicz wrote:
> >>>>>
> >>>>> Today there are still many retract sources, such as some sources in
> the
> >>>>> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
> >>>>> formats, etc.These can be further divided into two categories.
> >>>>> One is like Debezium: there is only a single UPDATE record in the
> >>> physical
> >>>>> storage, and the corresponding Flink source connector further splits
> it
> >>>>> into UA/UB. The other is where UA and UB are already two separate
> >>> changelog
> >>>>> records in the physical storage.
> >>>>> For the former, we could generate a watermark boundary before the
> source
> >>>>> just like checkpoint barrier, so that UB and UA are guaranteed to
> fall
> >>>>> within the same boundary. This should actually be supportable. It’s
> >>> okay if
> >>>>> we don’t support it in the first version, but it may affect the
> overall
> >>>>> design—for example, how to generate the system watermark boundary.
> >>>>> For the latter, it’s probably more troublesome. I think it’s also
> fine
> >>> not
> >>>>> to support it. What do you think?
> >>>>
> >>>>
> >>>> When designing the FLIP I considered the debezium case and it's
> actually
> >>>> not so much of a problem as you correctly pointed out. The only
> >>> requirement
> >>>> is that the watermark is generated before the message split. I'd start
> >>>> without support for those sources and we can improve on that later on.
> >>>>
> >>>> 3. Oh, what I meant is how about renaming it to something like
> >>>>>
> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
> >>>>> Because I think it may be not a “watermark”; it’s a compaction
> barrier,
> >>> and
> >>>>> this compaction can be 1) replaced by watermark, or 2) replaced by
> >>>>> checkpoint, or 3) generated by the Flink system internally. What do
> you
> >>>>> think?
> >>>>
> >>>>
> >>>> Fine by me. We can call
> >>>> it
> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval".
> >>>>
> >>>> 4. I’m also wondering whether we don’t even need the state about “a
> >>> single
> >>>>> result per key for a cross-watermark-boundary handover”?
> >>>>
> >>>>
> >>>> I am pretty sure we do in order to adhere to the ON CONFLICT ERROR
> >>>> behaviour. Bear in mind two "active" keys may come as part of two
> >>> separate
> >>>> barriers.
> >>>>
> >>>> On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote:
> >>>>
> >>>>> 1. I agree it, at least it won’t be worse. Currently, for data that
> >>>>> contains non-deterministic functions without UK, SUM cannot properly
> >>> handle
> >>>>> correcting out-of-order records.
> >>>>>
> >>>>>
> >>>>> 2. It’s fine if we first don’t support it. Let me add some more
> context.
> >>>>> Today there are still many retract sources, such as some sources in
> the
> >>>>> Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
> >>>>> formats, etc.These can be further divided into two categories.
> >>>>> One is like Debezium: there is only a single UPDATE record in the
> >>> physical
> >>>>> storage, and the corresponding Flink source connector further splits
> it
> >>>>> into UA/UB. The other is where UA and UB are already two separate
> >>> changelog
> >>>>> records in the physical storage.
> >>>>> For the former, we could generate a watermark boundary before the
> source
> >>>>> just like checkpoint barrier, so that UB and UA are guaranteed to
> fall
> >>>>> within the same boundary. This should actually be supportable. It’s
> >>> okay if
> >>>>> we don’t support it in the first version, but it may affect the
> overall
> >>>>> design—for example, how to generate the system watermark boundary.
> >>>>> For the latter, it’s probably more troublesome. I think it’s also
> fine
> >>> not
> >>>>> to support it. What do you think?
> >>>>>
> >>>>>
> >>>>> 3. Oh, what I meant is how about renaming it to something like
> >>>>>
> "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
> >>>>> Because I think it may be not a “watermark”; it’s a compaction
> barrier,
> >>> and
> >>>>> this compaction can be 1) replaced by watermark, or 2) replaced by
> >>>>> checkpoint, or 3) generated by the Flink system internally. What do
> you
> >>>>> think?
> >>>>>
> >>>>>
> >>>>> 4. I’m also wondering whether we don’t even need the state about “a
> >>> single
> >>>>> result per key for a cross-watermark-boundary handover”?
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>>       Best!
> >>>>>       Xuyang
> >>>>>
> >>>>>
> >>>>>
> >>>>> 在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]>
> 写道:
> >>>>>>> 1. Without upsert key, how do we determine the order of multiple
> >>> records
> >>>>>> within the same watermark boundary when non-deterministic functions
> are
> >>>>>> involved? For example, if we receive data like the “disorder (2)”
> case
> >>>>>> below, and the upsert key is lost after a join, what will the final
> >>> output
> >>>>>> be (without internal consistency issues)?
> >>>>>>
> >>>>>> If you have non-deterministic functions like in your example the
> >>>>> retraction
> >>>>>> does not work anyhow. For a retraction to work if there is no
> primary
> >>> key
> >>>>>> defined we require all columns to be deterministic:
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142
> >>>>>>
> >>>>>>> 2. To address internal consistency issues caused by emitting -U, we
> >>> need
> >>>>>> each watermark boundary to contain paired -U/+U. That means we have
> to
> >>>>>> track what the source sends, and only emit the "watermark boundary"
> >>> after
> >>>>>> the +U has been sent, right? For an upsert source, this is easy
> >>> because we
> >>>>>> have ChangelogNormalize to output the -U/+U pairs. But for a retract
> >>>>>> source, we may need to introduce a new stateful operator. This seems
> >>>>>> unavoidable unless the source to output -U and +U together.
> >>>>>>
> >>>>>> Yes, this assumption does not hold for retracting sources. So far we
> >>> don't
> >>>>>> have any such sources. I'll introduce a check that would fail for a
> >>>>>> combination of a retracting source and DO ERROR/DO NOTHING.
> >>>>>>
> >>>>>>> 3. About
> >>>>>>
> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
> >>> it’s
> >>>>>> not actually called “watermark internal”, because it doesn't have
> >>>>> watermark
> >>>>>> semantics to drop late data. It’s actually a compaction barrier,
> right?
> >>>>>>
> >>>>>> I think I don't fully understand this point. Could you please
> explain
> >>> it
> >>>>>> one more time? Are you suggesting a different name?
> >>>>>>
> >>>>>>> 4. The state in SUM is used to handle rollback under out-of-order
> >>>>>> scenarios. Since we resolve out-of-orderness within a watermark
> >>> boundary,
> >>>>>> does that mean we don’t need state anymore? More clearly, we only
> need
> >>> a
> >>>>>> "temporary" state that lives within each watermark boundary. (Or
> what I
> >>>>> can
> >>>>>> think of is: you’re using this persistent state to support the
> >>> subsequent
> >>>>>> `ON CONFLICT conflict_action`.)
> >>>>>>
> >>>>>> Yes, I think more or less that is correct. We need the "temporary"
> >>> state
> >>>>>> within boundary + a single result per key for a cross watermark
> >>> boundary
> >>>>>> handover. I explain that in the FLIP.
> >>>>>>
> >>>>>> Best,
> >>>>>> Dawid
> >>>>>>
> >>>>>> On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote:
> >>>>>>
> >>>>>>> Thank you for the explanation. I think I understand what you mean
> >>> now. I
> >>>>>>> have a few questions I’d like to confirm:
> >>>>>>> 1. Without upsert key, how do we determine the order of multiple
> >>> records
> >>>>>>> within the same watermark boundary when non-deterministic functions
> >>> are
> >>>>>>> involved? For example, if we receive data like the “disorder (2)”
> case
> >>>>>>> below, and the upsert key is lost after a join, what will the final
> >>>>> output
> >>>>>>> be (without internal consistency issues)?
> >>>>>>>
> >>>>>>>
> >>>>>>> +U(id=1, level=20, attr='b1', rand='1.5')
> >>>>>>> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I
> slightly
> >>>>>>> modified it
> >>>>>>> -U(id=1, level=10, attr='a1', rand='2')
> >>>>>>>
> >>>>>>>
> >>>>>>>> The watermark is an internal consistency boundary. You will always
> >>>>> have
> >>>>>>> a UB for the old value and an UA for the new value.
> >>>>>>> 2. To address internal consistency issues caused by emitting -U, we
> >>> need
> >>>>>>> each watermark boundary to contain paired -U/+U. That means we
> have to
> >>>>>>> track what the source sends, and only emit the "watermark boundary"
> >>>>> after
> >>>>>>> the +U has been sent, right? For an upsert source, this is easy
> >>> because
> >>>>> we
> >>>>>>> have ChangelogNormalize to output the -U/+U pairs. But for a
> retract
> >>>>>>> source, we may need to introduce a new stateful operator. This
> seems
> >>>>>>> unavoidable unless the source to output -U and +U together.
> >>>>>>>
> >>>>>>>
> >>>>>>> 3. About
> >>>>>>>
> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
> >>>>> it’s
> >>>>>>> not actually called “watermark internal”, because it doesn't have
> >>>>> watermark
> >>>>>>> semantics to drop late data. It’s actually a compaction barrier,
> >>> right?
> >>>>>>>
> >>>>>>>
> >>>>>>> 4. The state in SUM is used to handle rollback under out-of-order
> >>>>>>> scenarios. Since we resolve out-of-orderness within a watermark
> >>>>> boundary,
> >>>>>>> does that mean we don’t need state anymore? More clearly, we only
> >>> need a
> >>>>>>> "temporary" state that lives within each watermark boundary. (Or
> what
> >>> I
> >>>>> can
> >>>>>>> think of is: you’re using this persistent state to support the
> >>>>> subsequent
> >>>>>>> `ON CONFLICT conflict_action`.)
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>>
> >>>>>>>       Best!
> >>>>>>>       Xuyang
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]>
> 写道:
> >>>>>>>>> But I’d like to add a clarification. Take the “Changelog
> Disorder”
> >>>>>>>> example described in the FLIP (
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> >>>>>>> ).
> >>>>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON
> >>>>> CONFLICT
> >>>>>>>> ERROR, IIUC, the expected behavior is that Flink should fail.
> >>> However,
> >>>>>>>> those inserts and updates actually all come from the same PK on
> table
> >>>>> s1
> >>>>>>>> (id = 1). From a relational-algebra perspective, this does not
> >>> violate
> >>>>> the
> >>>>>>>> PK constraint; it only happens because we shuffle by level and
> end up
> >>>>> with
> >>>>>>>> out-of-order issues under multi parallelisms. In other words, if
> we
> >>> run
> >>>>>>>> this SQL in batch, the pk conflict will not happen and ON CONFLICT
> >>>>> ERROR
> >>>>>>>> should not fail. If the streaming job fails, users will be
> confused.
> >>>>> For
> >>>>>>>> disorder issues introduced by Flink internally, I believe Flink
> >>> should
> >>>>>>>> handle them internally.
> >>>>>>>>
> >>>>>>>> Let's first clarify this point, because I think it's vital for the
> >>>>>>>> understanding of the proposal so we must be on the same page
> before
> >>> we
> >>>>>>> talk
> >>>>>>>> about other points.
> >>>>>>>>
> >>>>>>>> No, the example you pointed out would not throw an error in `ON
> >>>>> CONFLICT
> >>>>>>>> ERROR`. As you pointed out yourself those come from the same PK on
> >>>>> table
> >>>>>>>> S1, therefore you will not have two active records with (id = 1)
> in
> >>> the
> >>>>>>>> sink on the watermark boundary. The watermark is an internal
> >>>>> consistency
> >>>>>>>> boundary. You will always have a UB for the old value and an UA
> for
> >>> the
> >>>>>>> new
> >>>>>>>> value. Therefore you will only ever have a single value after the
> >>>>>>>> compaction. We would throw only if we try to upsert into a single
> row
> >>>>> in
> >>>>>>>> the sink from two rows from s1 with different ids e.g.
> {source_id=1,
> >>>>>>>> sink_id=1}, {source_id=2, sink_id=1}.
> >>>>>>>>
> >>>>>>>>> Before I talk about 3, let me talk about 4 first. If I’m not
> >>>>> mistaken,
> >>>>>>> we
> >>>>>>>> need a deterministic boundary to determine that upstream data
> will no
> >>>>>>>> longer be updated, so that we can output the “final” result.
> >>>>>>>>
> >>>>>>>> No, that's not what I understand as internal consistency. An
> internal
> >>>>>>>> consistency is that a single change in the source produces a
> single
> >>>>> change
> >>>>>>>> in the sink, without incorrect intermediate states caused by
> >>>>> processing UB
> >>>>>>>> separately from UA. I don't want to process multiple source
> changes
> >>> in
> >>>>> a
> >>>>>>>> single batch. I'd rather call it "external" consistency or
> snapshot
> >>>>>>>> processing as you are suggesting, but in my mind this is an
> >>> orthogonal
> >>>>>>>> topic from what I am trying to solve here.
> >>>>>>>>
> >>>>>>>> On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:
> >>>>>>>>
> >>>>>>>>> Hi, Dawid. Thank you for your detailed explanation and the
> update.
> >>>>> Let
> >>>>>>> me
> >>>>>>>>> share my thoughts.
> >>>>>>>>> 1. In fact, I agree with what you said: for clearly problematic
> >>>>> queries,
> >>>>>>>>> we should fail fast — for example, the case you mentioned
> (writing
> >>>>> data
> >>>>>>>>> from a source table whose PK is id into a sink table whose PK is
> >>>>> name).
> >>>>>>> We
> >>>>>>>>> can fail like a traditional database: PK conflict. That’s totally
> >>>>> fine.
> >>>>>>>>> 2. But I’d like to add a clarification. Take the “Changelog
> >>> Disorder”
> >>>>>>>>> example described in the FLIP (
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> >>>>>>> ).
> >>>>>>>>> Let’s look at disorder (2) and disorder (3). Under the default ON
> >>>>>>> CONFLICT
> >>>>>>>>> ERROR, IIUC, the expected behavior is that Flink should fail.
> >>>>> However,
> >>>>>>>>> those inserts and updates actually all come from the same PK on
> >>>>> table s1
> >>>>>>>>> (id = 1). From a relational-algebra perspective, this does not
> >>>>> violate
> >>>>>>> the
> >>>>>>>>> PK constraint; it only happens because we shuffle by level and
> end
> >>> up
> >>>>>>> with
> >>>>>>>>> out-of-order issues under multi parallelisms. In other words, if
> we
> >>>>> run
> >>>>>>>>> this SQL in batch, the pk conflict will not happen and ON
> CONFLICT
> >>>>> ERROR
> >>>>>>>>> should not fail. If the streaming job fails, users will be
> confused.
> >>>>> For
> >>>>>>>>> disorder issues introduced by Flink internally, I believe Flink
> >>>>> should
> >>>>>>>>> handle them internally.
> >>>>>>>>> 4. Before I talk about 3, let me talk about 4 first. If I’m not
> >>>>>>> mistaken,
> >>>>>>>>> we need a deterministic boundary to determine that upstream data
> >>>>> will no
> >>>>>>>>> longer be updated, so that we can output the “final” result. I
> think
> >>>>> our
> >>>>>>>>> disagreement is about where that “data boundary” is. In this
> FLIP,
> >>>>> the
> >>>>>>>>> boundary is described as: 1) watermark or 2) checkpoint. But I
> think
> >>>>>>> such a
> >>>>>>>>> boundary is tied to the table, for example, “the creation of a
> >>>>> specific
> >>>>>>>>> historical snapshot version of a table.” Since data in the
> snapshot
> >>>>> is
> >>>>>>>>> immutable, we can output results at that point. What do you
> think?
> >>>>>>>>> 3. If we must choose one of the introduced options, I lean toward
> >>>>> Option
> >>>>>>>>> 1, because we already have a clear definition for watermarks
> defined
> >>>>> on
> >>>>>>> a
> >>>>>>>>> table: “allowing for consistent results despite out-of-order or
> late
> >>>>>>>>> events” (
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time
> >>>>>>> ).
> >>>>>>>>> This “drop late events” semantic does not exist in checkpoint.
> >>>>> However,
> >>>>>>> my
> >>>>>>>>> concern is that in most scenarios, a CDC source may produce
> multiple
> >>>>>>>>> updates for the same PK over a long time span, so the watermark
> >>>>> should
> >>>>>>> be
> >>>>>>>>> defined very large, which will cause the job to produce no output
> >>>>> for a
> >>>>>>>>> long time.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>>       Best!
> >>>>>>>>>       Xuyang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <
> [email protected]>
> >>>>>>> wrote:
> >>>>>>>>>> Hey Gustavo, Xuyang
> >>>>>>>>>> I tried incorporating your suggestions into the FLIP. Please
> take
> >>>>>>> another
> >>>>>>>>>> look.
> >>>>>>>>>> Best,
> >>>>>>>>>> Dawid
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <
> >>>>> [email protected]
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> 1. The default behavior changes if no ON CONFLICT is defined. I
> >>>>> am a
> >>>>>>>>>>>> little concerned that this may cause errors in a large number
> of
> >>>>>>>>> existing
> >>>>>>>>>>>> cases.
> >>>>>>>>>>>
> >>>>>>>>>>> I can be convinced to leave the default behaviour as it is
> now. I
> >>>>> am
> >>>>>>>>>>> worried though, very rarely the current behaviour of SUM is
> what
> >>>>>>> people
> >>>>>>>>>>> actually want. As mentioned in the FLIP I wholeheartedly
> believe
> >>>>>>> there
> >>>>>>>>> are
> >>>>>>>>>>> very little if any real world scenarios where you need the
> >>>>>>> deduplicate
> >>>>>>>>>>> behaviour. I try to elaborate a bit more in 2)
> >>>>>>>>>>>
> >>>>>>>>>>> 2. Regarding On Conflict Errors, in the context of CDC streams,
> >>>>> it is
> >>>>>>>>>>>> expected that the vast majority of cases cannot generate only
> one
> >>>>>>>>> record
> >>>>>>>>>>>> with one primary key. The only solutions I can think of are
> >>>>>>> append-only
> >>>>>>>>>>>> top1, deduplication, or aggregating the first row.
> >>>>>>>>>>>
> >>>>>>>>>>> I disagree with that statement. I don't think CDC streams
> change
> >>>>>>>>> anything
> >>>>>>>>>>> in that regard. Maybe there is some misunderstanding about
> what a
> >>>>> one
> >>>>>>>>>>> record means in this context.
> >>>>>>>>>>>
> >>>>>>>>>>> I agree almost certainly there will be a sequence of UA, UB
> for a
> >>>>>>> single
> >>>>>>>>>>> sink's primary key.
> >>>>>>>>>>>
> >>>>>>>>>>> My claim is that users almost never want a situation where they
> >>>>> have
> >>>>>>>>> more
> >>>>>>>>>>> than one "active" upsert key/record for one sink's primary
> key. I
> >>>>>>> tried
> >>>>>>>>> to
> >>>>>>>>>>> explain that in the FLIP, but let me try to give one more
> example
> >>>>>>> here.
> >>>>>>>>>>>
> >>>>>>>>>>> Imagine two tables:
> >>>>>>>>>>> CREATE TABLE source (
> >>>>>>>>>>>     id bigint PRIMARY KEY,
> >>>>>>>>>>>     name string,
> >>>>>>>>>>>     value string
> >>>>>>>>>>> )
> >>>>>>>>>>>
> >>>>>>>>>>> CREATE TABLE sink (
> >>>>>>>>>>>     name string PRIMARY KEY,
> >>>>>>>>>>>     value string
> >>>>>>>>>>> )
> >>>>>>>>>>>
> >>>>>>>>>>> INSERT INTO sink SELECT name, value;
> >>>>>>>>>>>
> >>>>>>>>>>> === Input
> >>>>>>>>>>> (1, "Apple", "ABC")
> >>>>>>>>>>> (2, "Apple", "DEF")
> >>>>>>>>>>>
> >>>>>>>>>>> In the scenario above a SUM is inserted which will deduplicate
> the
> >>>>>>> rows
> >>>>>>>>>>> and override the value for "Apple" with "DEF". In my opinion
> it's
> >>>>>>>>> entirely
> >>>>>>>>>>> wrong, instead an exception should be thrown that there is
> >>>>> actually a
> >>>>>>>>>>> constraint validation.
> >>>>>>>>>>>
> >>>>>>>>>>> I am absolutely more than happy to be proved wrong. If you do
> >>>>> have a
> >>>>>>>>> real
> >>>>>>>>>>> world scenario where the deduplication logic is actually
> correct
> >>>>> and
> >>>>>>>>>>> expected please, please do share. So far I have not seen one,
> nor
> >>>>>>> was I
> >>>>>>>>>>> able to come up with one. And yet I am not suggesting to remove
> >>>>> the
> >>>>>>>>>>> deduplication logic entirely, users can still use it with ON
> >>>>> CONFLICT
> >>>>>>>>>>> DEDUPLICATE.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. The special watermark generation interval affects the
> >>>>> visibility
> >>>>>>> of
> >>>>>>>>>>>> results. How can users configure this generation interval?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> That's a fair question I'll try to elaborate on in the FLIP. I
> can
> >>>>>>> see
> >>>>>>>>> two
> >>>>>>>>>>> options:
> >>>>>>>>>>> 1. We piggyback on existing watermarks in the query, if there
> are
> >>>>> no
> >>>>>>>>>>> watermarks (tables don't have a watermark definition) we fail
> >>>>> during
> >>>>>>>>>>> planning
> >>>>>>>>>>> 2. We add a new parameter option for a specialized generalized
> >>>>>>> watermark
> >>>>>>>>>>>
> >>>>>>>>>>> Let me think for some more on that and I'll come back with a
> more
> >>>>>>>>> concrete
> >>>>>>>>>>> proposal.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> 4. I believe that resolving out-of-order issues and addressing
> >>>>>>> internal
> >>>>>>>>>>>> consistency are two separate problems. As I understand the
> >>>>> current
> >>>>>>>>>>>> solution, it does not  really resolve the internal consistency
> >>>>>>> issue.
> >>>>>>>>> We
> >>>>>>>>>>>> could first resolve the out-of-order problem. For most
> scenarios
> >>>>>>> that
> >>>>>>>>>>>> require real-time response, we can directly output
> intermediate
> >>>>>>> results
> >>>>>>>>>>>> promptly.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Why doesn't it solve it? It does. Given a pair of UB/UA we
> won't
> >>>>> emit
> >>>>>>>>> the
> >>>>>>>>>>> temporary state after processing the UB.
> >>>>>>>>>>>
> >>>>>>>>>>> 5. How can we compact data with the same custom watermark? If
> >>>>>>> detailed
> >>>>>>>>>>>> comparisons are necessary, I think we still need to preserve
> all
> >>>>> key
> >>>>>>>>> data;
> >>>>>>>>>>>> we would just be compressing this data further at time t.
> >>>>>>>>>>>
> >>>>>>>>>>> Yes, we need to preserve all key data, but only between two
> >>>>>>> watermarks.
> >>>>>>>>>>> Assuming frequent watermarks, that's for a very short time.
> >>>>>>>>>>>
> >>>>>>>>>>> 6. If neither this proposed solution nor the reject solution
> can
> >>>>>>> resolve
> >>>>>>>>>>>> internal consistency, we need to reconsider the differences
> >>>>> between
> >>>>>>>>> the two
> >>>>>>>>>>>> approaches.
> >>>>>>>>>>>
> >>>>>>>>>>> I'll copy the explanation why the rejected alternative should
> be
> >>>>>>>>> rejected
> >>>>>>>>>>> from the FLIP:
> >>>>>>>>>>>
> >>>>>>>>>>> The solution can help us to solve the changelog disorder
> problem,
> >>>>>>> but it
> >>>>>>>>>>> does not help with the *internal consistency *issue. If we
> want to
> >>>>>>> fix
> >>>>>>>>>>> that as well, we still need the compaction on watermarks. At
> the
> >>>>> same
> >>>>>>>>> time
> >>>>>>>>>>> it increases the size of all flowing records. Therefore it was
> >>>>>>> rejected
> >>>>>>>>> in
> >>>>>>>>>>> favour of simply compacting all records once on the
> progression of
> >>>>>>>>>>> watermarks.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Dawid
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>>
>
>

Reply via email to