> 1. Without upsert key, how do we determine the order of multiple records
within the same watermark boundary when non-deterministic functions are
involved? For example, if we receive data like the “disorder (2)” case
below, and the upsert key is lost after a join, what will the final output
be (without internal consistency issues)?

If you have non-deterministic functions like in your example the retraction
does not work anyhow. For a retraction to work if there is no primary key
defined we require all columns to be deterministic:
https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142

> 2. To address internal consistency issues caused by emitting -U, we need
each watermark boundary to contain paired -U/+U. That means we have to
track what the source sends, and only emit the "watermark boundary" after
the +U has been sent, right? For an upsert source, this is easy because we
have ChangelogNormalize to output the -U/+U pairs. But for a retract
source, we may need to introduce a new stateful operator. This seems
unavoidable unless the source to output -U and +U together.

Yes, this assumption does not hold for retracting sources. So far we don't
have any such sources. I'll introduce a check that would fail for a
combination of a retracting source and DO ERROR/DO NOTHING.

> 3. About
"table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s
not actually called “watermark internal”, because it doesn't have watermark
semantics to drop late data. It’s actually a compaction barrier, right?

I think I don't fully understand this point. Could you please explain it
one more time? Are you suggesting a different name?

> 4. The state in SUM is used to handle rollback under out-of-order
scenarios. Since we resolve out-of-orderness within a watermark boundary,
does that mean we don’t need state anymore? More clearly, we only need a
"temporary" state that lives within each watermark boundary. (Or what I can
think of is: you’re using this persistent state to support the subsequent
`ON CONFLICT conflict_action`.)

Yes, I think more or less that is correct. We need the "temporary" state
within boundary + a single result per key for a cross watermark boundary
handover. I explain that in the FLIP.

Best,
Dawid

On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote:

> Thank you for the explanation. I think I understand what you mean now. I
> have a few questions I’d like to confirm:
> 1. Without upsert key, how do we determine the order of multiple records
> within the same watermark boundary when non-deterministic functions are
> involved? For example, if we receive data like the “disorder (2)” case
> below, and the upsert key is lost after a join, what will the final output
> be (without internal consistency issues)?
>
>
> +U(id=1, level=20, attr='b1', rand='1.5')
> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly
> modified it
> -U(id=1, level=10, attr='a1', rand='2')
>
>
> > The watermark is an internal consistency boundary. You will always have
> a UB for the old value and an UA for the new value.
> 2. To address internal consistency issues caused by emitting -U, we need
> each watermark boundary to contain paired -U/+U. That means we have to
> track what the source sends, and only emit the "watermark boundary" after
> the +U has been sent, right? For an upsert source, this is easy because we
> have ChangelogNormalize to output the -U/+U pairs. But for a retract
> source, we may need to introduce a new stateful operator. This seems
> unavoidable unless the source to output -U and +U together.
>
>
> 3. About
> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s
> not actually called “watermark internal”, because it doesn't have watermark
> semantics to drop late data. It’s actually a compaction barrier, right?
>
>
> 4. The state in SUM is used to handle rollback under out-of-order
> scenarios. Since we resolve out-of-orderness within a watermark boundary,
> does that mean we don’t need state anymore? More clearly, we only need a
> "temporary" state that lives within each watermark boundary. (Or what I can
> think of is: you’re using this persistent state to support the subsequent
> `ON CONFLICT conflict_action`.)
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道:
> >> But I’d like to add a clarification. Take the “Changelog Disorder”
> >example described in the FLIP (
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> ).
> >Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
> >ERROR, IIUC, the expected behavior is that Flink should fail. However,
> >those inserts and updates actually all come from the same PK on table s1
> >(id = 1). From a relational-algebra perspective, this does not violate the
> >PK constraint; it only happens because we shuffle by level and end up with
> >out-of-order issues under multi parallelisms. In other words, if we run
> >this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
> >should not fail. If the streaming job fails, users will be confused. For
> >disorder issues introduced by Flink internally, I believe Flink should
> >handle them internally.
> >
> >Let's first clarify this point, because I think it's vital for the
> >understanding of the proposal so we must be on the same page before we
> talk
> >about other points.
> >
> >No, the example you pointed out would not throw an error in `ON CONFLICT
> >ERROR`. As you pointed out yourself those come from the same PK on table
> >S1, therefore you will not have two active records with (id = 1) in the
> >sink on the watermark boundary. The watermark is an internal consistency
> >boundary. You will always have a UB for the old value and an UA for the
> new
> >value. Therefore you will only ever have a single value after the
> >compaction. We would throw only if we try to upsert into a single row in
> >the sink from two rows from s1 with different ids e.g. {source_id=1,
> >sink_id=1}, {source_id=2, sink_id=1}.
> >
> >> Before I talk about 3, let me talk about 4 first. If I’m not mistaken,
> we
> >need a deterministic boundary to determine that upstream data will no
> >longer be updated, so that we can output the “final” result.
> >
> >No, that's not what I understand as internal consistency. An internal
> >consistency is that a single change in the source produces a single change
> >in the sink, without incorrect intermediate states caused by processing UB
> >separately from UA. I don't want to process multiple source changes in a
> >single batch. I'd rather call it "external" consistency or snapshot
> >processing as you are suggesting, but in my mind this is an orthogonal
> >topic from what I am trying to solve here.
> >
> >On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:
> >
> >> Hi, Dawid. Thank you for your detailed explanation and the update. Let
> me
> >> share my thoughts.
> >> 1. In fact, I agree with what you said: for clearly problematic queries,
> >> we should fail fast — for example, the case you mentioned (writing data
> >> from a source table whose PK is id into a sink table whose PK is name).
> We
> >> can fail like a traditional database: PK conflict. That’s totally fine.
> >> 2. But I’d like to add a clarification. Take the “Changelog Disorder”
> >> example described in the FLIP (
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
> ).
> >> Let’s look at disorder (2) and disorder (3). Under the default ON
> CONFLICT
> >> ERROR, IIUC, the expected behavior is that Flink should fail. However,
> >> those inserts and updates actually all come from the same PK on table s1
> >> (id = 1). From a relational-algebra perspective, this does not violate
> the
> >> PK constraint; it only happens because we shuffle by level and end up
> with
> >> out-of-order issues under multi parallelisms. In other words, if we run
> >> this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR
> >> should not fail. If the streaming job fails, users will be confused. For
> >> disorder issues introduced by Flink internally, I believe Flink should
> >> handle them internally.
> >> 4. Before I talk about 3, let me talk about 4 first. If I’m not
> mistaken,
> >> we need a deterministic boundary to determine that upstream data will no
> >> longer be updated, so that we can output the “final” result. I think our
> >> disagreement is about where that “data boundary” is. In this FLIP, the
> >> boundary is described as: 1) watermark or 2) checkpoint. But I think
> such a
> >> boundary is tied to the table, for example, “the creation of a specific
> >> historical snapshot version of a table.” Since data in the snapshot is
> >> immutable, we can output results at that point. What do you think?
> >> 3. If we must choose one of the introduced options, I lean toward Option
> >> 1, because we already have a clear definition for watermarks defined on
> a
> >> table: “allowing for consistent results despite out-of-order or late
> >> events” (
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time
> ).
> >> This “drop late events” semantic does not exist in checkpoint. However,
> my
> >> concern is that in most scenarios, a CDC source may produce multiple
> >> updates for the same PK over a long time span, so the watermark should
> be
> >> defined very large, which will cause the job to produce no output for a
> >> long time.
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]>
> wrote:
> >> >Hey Gustavo, Xuyang
> >> >I tried incorporating your suggestions into the FLIP. Please take
> another
> >> >look.
> >> >Best,
> >> >Dawid
> >> >
> >> >On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <[email protected]
> >
> >> >wrote:
> >> >
> >> >> 1. The default behavior changes if no ON CONFLICT is defined. I am a
> >> >>> little concerned that this may cause errors in a large number of
> >> existing
> >> >>> cases.
> >> >>
> >> >> I can be convinced to leave the default behaviour as it is now. I am
> >> >> worried though, very rarely the current behaviour of SUM is what
> people
> >> >> actually want. As mentioned in the FLIP I wholeheartedly believe
> there
> >> are
> >> >> very little if any real world scenarios where you need the
> deduplicate
> >> >> behaviour. I try to elaborate a bit more in 2)
> >> >>
> >> >> 2. Regarding On Conflict Errors, in the context of CDC streams, it is
> >> >>> expected that the vast majority of cases cannot generate only one
> >> record
> >> >>> with one primary key. The only solutions I can think of are
> append-only
> >> >>> top1, deduplication, or aggregating the first row.
> >> >>
> >> >> I disagree with that statement. I don't think CDC streams change
> >> anything
> >> >> in that regard. Maybe there is some misunderstanding about what a one
> >> >> record means in this context.
> >> >>
> >> >> I agree almost certainly there will be a sequence of UA, UB for a
> single
> >> >> sink's primary key.
> >> >>
> >> >> My claim is that users almost never want a situation where they have
> >> more
> >> >> than one "active" upsert key/record for one sink's primary key. I
> tried
> >> to
> >> >> explain that in the FLIP, but let me try to give one more example
> here.
> >> >>
> >> >> Imagine two tables:
> >> >> CREATE TABLE source (
> >> >>   id bigint PRIMARY KEY,
> >> >>   name string,
> >> >>   value string
> >> >> )
> >> >>
> >> >> CREATE TABLE sink (
> >> >>   name string PRIMARY KEY,
> >> >>   value string
> >> >> )
> >> >>
> >> >> INSERT INTO sink SELECT name, value;
> >> >>
> >> >> === Input
> >> >> (1, "Apple", "ABC")
> >> >> (2, "Apple", "DEF")
> >> >>
> >> >> In the scenario above a SUM is inserted which will deduplicate the
> rows
> >> >> and override the value for "Apple" with "DEF". In my opinion it's
> >> entirely
> >> >> wrong, instead an exception should be thrown that there is actually a
> >> >> constraint validation.
> >> >>
> >> >> I am absolutely more than happy to be proved wrong. If you do have a
> >> real
> >> >> world scenario where the deduplication logic is actually correct and
> >> >> expected please, please do share. So far I have not seen one, nor
> was I
> >> >> able to come up with one. And yet I am not suggesting to remove the
> >> >> deduplication logic entirely, users can still use it with ON CONFLICT
> >> >> DEDUPLICATE.
> >> >>
> >> >> 3. The special watermark generation interval affects the visibility
> of
> >> >>> results. How can users configure this generation interval?
> >> >>
> >> >>
> >> >> That's a fair question I'll try to elaborate on in the FLIP. I can
> see
> >> two
> >> >> options:
> >> >> 1. We piggyback on existing watermarks in the query, if there are no
> >> >> watermarks (tables don't have a watermark definition) we fail during
> >> >> planning
> >> >> 2. We add a new parameter option for a specialized generalized
> watermark
> >> >>
> >> >> Let me think for some more on that and I'll come back with a more
> >> concrete
> >> >> proposal.
> >> >>
> >> >>
> >> >>> 4. I believe that resolving out-of-order issues and addressing
> internal
> >> >>> consistency are two separate problems. As I understand the current
> >> >>> solution, it does not  really resolve the internal consistency
> issue.
> >> We
> >> >>> could first resolve the out-of-order problem. For most scenarios
> that
> >> >>> require real-time response, we can directly output intermediate
> results
> >> >>> promptly.
> >> >>
> >> >>
> >> >> Why doesn't it solve it? It does. Given a pair of UB/UA we won't emit
> >> the
> >> >> temporary state after processing the UB.
> >> >>
> >> >> 5. How can we compact data with the same custom watermark? If
> detailed
> >> >>> comparisons are necessary, I think we still need to preserve all key
> >> data;
> >> >>> we would just be compressing this data further at time t.
> >> >>
> >> >> Yes, we need to preserve all key data, but only between two
> watermarks.
> >> >> Assuming frequent watermarks, that's for a very short time.
> >> >>
> >> >> 6. If neither this proposed solution nor the reject solution can
> resolve
> >> >>> internal consistency, we need to reconsider the differences between
> >> the two
> >> >>> approaches.
> >> >>
> >> >> I'll copy the explanation why the rejected alternative should be
> >> rejected
> >> >> from the FLIP:
> >> >>
> >> >> The solution can help us to solve the changelog disorder problem,
> but it
> >> >> does not help with the *internal consistency *issue. If we want to
> fix
> >> >> that as well, we still need the compaction on watermarks. At the same
> >> time
> >> >> it increases the size of all flowing records. Therefore it was
> rejected
> >> in
> >> >> favour of simply compacting all records once on the progression of
> >> >> watermarks.
> >> >>
> >> >> Best,
> >> >> Dawid
> >> >>
> >>
>

Reply via email to