> 1. Without upsert key, how do we determine the order of multiple records within the same watermark boundary when non-deterministic functions are involved? For example, if we receive data like the “disorder (2)” case below, and the upsert key is lost after a join, what will the final output be (without internal consistency issues)?
If you have non-deterministic functions like in your example the retraction does not work anyhow. For a retraction to work if there is no primary key defined we require all columns to be deterministic: https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142 > 2. To address internal consistency issues caused by emitting -U, we need each watermark boundary to contain paired -U/+U. That means we have to track what the source sends, and only emit the "watermark boundary" after the +U has been sent, right? For an upsert source, this is easy because we have ChangelogNormalize to output the -U/+U pairs. But for a retract source, we may need to introduce a new stateful operator. This seems unavoidable unless the source to output -U and +U together. Yes, this assumption does not hold for retracting sources. So far we don't have any such sources. I'll introduce a check that would fail for a combination of a retracting source and DO ERROR/DO NOTHING. > 3. About "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s not actually called “watermark internal”, because it doesn't have watermark semantics to drop late data. It’s actually a compaction barrier, right? I think I don't fully understand this point. Could you please explain it one more time? Are you suggesting a different name? > 4. The state in SUM is used to handle rollback under out-of-order scenarios. Since we resolve out-of-orderness within a watermark boundary, does that mean we don’t need state anymore? More clearly, we only need a "temporary" state that lives within each watermark boundary. (Or what I can think of is: you’re using this persistent state to support the subsequent `ON CONFLICT conflict_action`.) Yes, I think more or less that is correct. We need the "temporary" state within boundary + a single result per key for a cross watermark boundary handover. I explain that in the FLIP. Best, Dawid On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote: > Thank you for the explanation. I think I understand what you mean now. I > have a few questions I’d like to confirm: > 1. Without upsert key, how do we determine the order of multiple records > within the same watermark boundary when non-deterministic functions are > involved? For example, if we receive data like the “disorder (2)” case > below, and the upsert key is lost after a join, what will the final output > be (without internal consistency issues)? > > > +U(id=1, level=20, attr='b1', rand='1.5') > +U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly > modified it > -U(id=1, level=10, attr='a1', rand='2') > > > > The watermark is an internal consistency boundary. You will always have > a UB for the old value and an UA for the new value. > 2. To address internal consistency issues caused by emitting -U, we need > each watermark boundary to contain paired -U/+U. That means we have to > track what the source sends, and only emit the "watermark boundary" after > the +U has been sent, right? For an upsert source, this is easy because we > have ChangelogNormalize to output the -U/+U pairs. But for a retract > source, we may need to introduce a new stateful operator. This seems > unavoidable unless the source to output -U and +U together. > > > 3. About > "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s > not actually called “watermark internal”, because it doesn't have watermark > semantics to drop late data. It’s actually a compaction barrier, right? > > > 4. The state in SUM is used to handle rollback under out-of-order > scenarios. Since we resolve out-of-orderness within a watermark boundary, > does that mean we don’t need state anymore? More clearly, we only need a > "temporary" state that lives within each watermark boundary. (Or what I can > think of is: you’re using this persistent state to support the subsequent > `ON CONFLICT conflict_action`.) > > > > -- > > Best! > Xuyang > > > > 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道: > >> But I’d like to add a clarification. Take the “Changelog Disorder” > >example described in the FLIP ( > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > ). > >Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT > >ERROR, IIUC, the expected behavior is that Flink should fail. However, > >those inserts and updates actually all come from the same PK on table s1 > >(id = 1). From a relational-algebra perspective, this does not violate the > >PK constraint; it only happens because we shuffle by level and end up with > >out-of-order issues under multi parallelisms. In other words, if we run > >this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR > >should not fail. If the streaming job fails, users will be confused. For > >disorder issues introduced by Flink internally, I believe Flink should > >handle them internally. > > > >Let's first clarify this point, because I think it's vital for the > >understanding of the proposal so we must be on the same page before we > talk > >about other points. > > > >No, the example you pointed out would not throw an error in `ON CONFLICT > >ERROR`. As you pointed out yourself those come from the same PK on table > >S1, therefore you will not have two active records with (id = 1) in the > >sink on the watermark boundary. The watermark is an internal consistency > >boundary. You will always have a UB for the old value and an UA for the > new > >value. Therefore you will only ever have a single value after the > >compaction. We would throw only if we try to upsert into a single row in > >the sink from two rows from s1 with different ids e.g. {source_id=1, > >sink_id=1}, {source_id=2, sink_id=1}. > > > >> Before I talk about 3, let me talk about 4 first. If I’m not mistaken, > we > >need a deterministic boundary to determine that upstream data will no > >longer be updated, so that we can output the “final” result. > > > >No, that's not what I understand as internal consistency. An internal > >consistency is that a single change in the source produces a single change > >in the sink, without incorrect intermediate states caused by processing UB > >separately from UA. I don't want to process multiple source changes in a > >single batch. I'd rather call it "external" consistency or snapshot > >processing as you are suggesting, but in my mind this is an orthogonal > >topic from what I am trying to solve here. > > > >On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote: > > > >> Hi, Dawid. Thank you for your detailed explanation and the update. Let > me > >> share my thoughts. > >> 1. In fact, I agree with what you said: for clearly problematic queries, > >> we should fail fast — for example, the case you mentioned (writing data > >> from a source table whose PK is id into a sink table whose PK is name). > We > >> can fail like a traditional database: PK conflict. That’s totally fine. > >> 2. But I’d like to add a clarification. Take the “Changelog Disorder” > >> example described in the FLIP ( > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > ). > >> Let’s look at disorder (2) and disorder (3). Under the default ON > CONFLICT > >> ERROR, IIUC, the expected behavior is that Flink should fail. However, > >> those inserts and updates actually all come from the same PK on table s1 > >> (id = 1). From a relational-algebra perspective, this does not violate > the > >> PK constraint; it only happens because we shuffle by level and end up > with > >> out-of-order issues under multi parallelisms. In other words, if we run > >> this SQL in batch, the pk conflict will not happen and ON CONFLICT ERROR > >> should not fail. If the streaming job fails, users will be confused. For > >> disorder issues introduced by Flink internally, I believe Flink should > >> handle them internally. > >> 4. Before I talk about 3, let me talk about 4 first. If I’m not > mistaken, > >> we need a deterministic boundary to determine that upstream data will no > >> longer be updated, so that we can output the “final” result. I think our > >> disagreement is about where that “data boundary” is. In this FLIP, the > >> boundary is described as: 1) watermark or 2) checkpoint. But I think > such a > >> boundary is tied to the table, for example, “the creation of a specific > >> historical snapshot version of a table.” Since data in the snapshot is > >> immutable, we can output results at that point. What do you think? > >> 3. If we must choose one of the introduced options, I lean toward Option > >> 1, because we already have a clear definition for watermarks defined on > a > >> table: “allowing for consistent results despite out-of-order or late > >> events” ( > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time > ). > >> This “drop late events” semantic does not exist in checkpoint. However, > my > >> concern is that in most scenarios, a CDC source may produce multiple > >> updates for the same PK over a long time span, so the watermark should > be > >> defined very large, which will cause the job to produce no output for a > >> long time. > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> > wrote: > >> >Hey Gustavo, Xuyang > >> >I tried incorporating your suggestions into the FLIP. Please take > another > >> >look. > >> >Best, > >> >Dawid > >> > > >> >On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <[email protected] > > > >> >wrote: > >> > > >> >> 1. The default behavior changes if no ON CONFLICT is defined. I am a > >> >>> little concerned that this may cause errors in a large number of > >> existing > >> >>> cases. > >> >> > >> >> I can be convinced to leave the default behaviour as it is now. I am > >> >> worried though, very rarely the current behaviour of SUM is what > people > >> >> actually want. As mentioned in the FLIP I wholeheartedly believe > there > >> are > >> >> very little if any real world scenarios where you need the > deduplicate > >> >> behaviour. I try to elaborate a bit more in 2) > >> >> > >> >> 2. Regarding On Conflict Errors, in the context of CDC streams, it is > >> >>> expected that the vast majority of cases cannot generate only one > >> record > >> >>> with one primary key. The only solutions I can think of are > append-only > >> >>> top1, deduplication, or aggregating the first row. > >> >> > >> >> I disagree with that statement. I don't think CDC streams change > >> anything > >> >> in that regard. Maybe there is some misunderstanding about what a one > >> >> record means in this context. > >> >> > >> >> I agree almost certainly there will be a sequence of UA, UB for a > single > >> >> sink's primary key. > >> >> > >> >> My claim is that users almost never want a situation where they have > >> more > >> >> than one "active" upsert key/record for one sink's primary key. I > tried > >> to > >> >> explain that in the FLIP, but let me try to give one more example > here. > >> >> > >> >> Imagine two tables: > >> >> CREATE TABLE source ( > >> >> id bigint PRIMARY KEY, > >> >> name string, > >> >> value string > >> >> ) > >> >> > >> >> CREATE TABLE sink ( > >> >> name string PRIMARY KEY, > >> >> value string > >> >> ) > >> >> > >> >> INSERT INTO sink SELECT name, value; > >> >> > >> >> === Input > >> >> (1, "Apple", "ABC") > >> >> (2, "Apple", "DEF") > >> >> > >> >> In the scenario above a SUM is inserted which will deduplicate the > rows > >> >> and override the value for "Apple" with "DEF". In my opinion it's > >> entirely > >> >> wrong, instead an exception should be thrown that there is actually a > >> >> constraint validation. > >> >> > >> >> I am absolutely more than happy to be proved wrong. If you do have a > >> real > >> >> world scenario where the deduplication logic is actually correct and > >> >> expected please, please do share. So far I have not seen one, nor > was I > >> >> able to come up with one. And yet I am not suggesting to remove the > >> >> deduplication logic entirely, users can still use it with ON CONFLICT > >> >> DEDUPLICATE. > >> >> > >> >> 3. The special watermark generation interval affects the visibility > of > >> >>> results. How can users configure this generation interval? > >> >> > >> >> > >> >> That's a fair question I'll try to elaborate on in the FLIP. I can > see > >> two > >> >> options: > >> >> 1. We piggyback on existing watermarks in the query, if there are no > >> >> watermarks (tables don't have a watermark definition) we fail during > >> >> planning > >> >> 2. We add a new parameter option for a specialized generalized > watermark > >> >> > >> >> Let me think for some more on that and I'll come back with a more > >> concrete > >> >> proposal. > >> >> > >> >> > >> >>> 4. I believe that resolving out-of-order issues and addressing > internal > >> >>> consistency are two separate problems. As I understand the current > >> >>> solution, it does not really resolve the internal consistency > issue. > >> We > >> >>> could first resolve the out-of-order problem. For most scenarios > that > >> >>> require real-time response, we can directly output intermediate > results > >> >>> promptly. > >> >> > >> >> > >> >> Why doesn't it solve it? It does. Given a pair of UB/UA we won't emit > >> the > >> >> temporary state after processing the UB. > >> >> > >> >> 5. How can we compact data with the same custom watermark? If > detailed > >> >>> comparisons are necessary, I think we still need to preserve all key > >> data; > >> >>> we would just be compressing this data further at time t. > >> >> > >> >> Yes, we need to preserve all key data, but only between two > watermarks. > >> >> Assuming frequent watermarks, that's for a very short time. > >> >> > >> >> 6. If neither this proposed solution nor the reject solution can > resolve > >> >>> internal consistency, we need to reconsider the differences between > >> the two > >> >>> approaches. > >> >> > >> >> I'll copy the explanation why the rejected alternative should be > >> rejected > >> >> from the FLIP: > >> >> > >> >> The solution can help us to solve the changelog disorder problem, > but it > >> >> does not help with the *internal consistency *issue. If we want to > fix > >> >> that as well, we still need the compaction on watermarks. At the same > >> time > >> >> it increases the size of all flowing records. Therefore it was > rejected > >> in > >> >> favour of simply compacting all records once on the progression of > >> >> watermarks. > >> >> > >> >> Best, > >> >> Dawid > >> >> > >> >
