> > Today there are still many retract sources, such as some sources in the > Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > formats, etc.These can be further divided into two categories. > One is like Debezium: there is only a single UPDATE record in the physical > storage, and the corresponding Flink source connector further splits it > into UA/UB. The other is where UA and UB are already two separate changelog > records in the physical storage. > For the former, we could generate a watermark boundary before the source > just like checkpoint barrier, so that UB and UA are guaranteed to fall > within the same boundary. This should actually be supportable. It’s okay if > we don’t support it in the first version, but it may affect the overall > design—for example, how to generate the system watermark boundary. > For the latter, it’s probably more troublesome. I think it’s also fine not > to support it. What do you think?
When designing the FLIP I considered the debezium case and it's actually not so much of a problem as you correctly pointed out. The only requirement is that the watermark is generated before the message split. I'd start without support for those sources and we can improve on that later on. 3. Oh, what I meant is how about renaming it to something like > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > Because I think it may be not a “watermark”; it’s a compaction barrier, and > this compaction can be 1) replaced by watermark, or 2) replaced by > checkpoint, or 3) generated by the Flink system internally. What do you > think? Fine by me. We can call it "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval". 4. I’m also wondering whether we don’t even need the state about “a single > result per key for a cross-watermark-boundary handover”? I am pretty sure we do in order to adhere to the ON CONFLICT ERROR behaviour. Bear in mind two "active" keys may come as part of two separate barriers. On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote: > 1. I agree it, at least it won’t be worse. Currently, for data that > contains non-deterministic functions without UK, SUM cannot properly handle > correcting out-of-order records. > > > 2. It’s fine if we first don’t support it. Let me add some more context. > Today there are still many retract sources, such as some sources in the > Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some > formats, etc.These can be further divided into two categories. > One is like Debezium: there is only a single UPDATE record in the physical > storage, and the corresponding Flink source connector further splits it > into UA/UB. The other is where UA and UB are already two separate changelog > records in the physical storage. > For the former, we could generate a watermark boundary before the source > just like checkpoint barrier, so that UB and UA are guaranteed to fall > within the same boundary. This should actually be supportable. It’s okay if > we don’t support it in the first version, but it may affect the overall > design—for example, how to generate the system watermark boundary. > For the latter, it’s probably more troublesome. I think it’s also fine not > to support it. What do you think? > > > 3. Oh, what I meant is how about renaming it to something like > "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"? > Because I think it may be not a “watermark”; it’s a compaction barrier, and > this compaction can be 1) replaced by watermark, or 2) replaced by > checkpoint, or 3) generated by the Flink system internally. What do you > think? > > > 4. I’m also wondering whether we don’t even need the state about “a single > result per key for a cross-watermark-boundary handover”? > > > > > > -- > > Best! > Xuyang > > > > 在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]> 写道: > >> 1. Without upsert key, how do we determine the order of multiple records > >within the same watermark boundary when non-deterministic functions are > >involved? For example, if we receive data like the “disorder (2)” case > >below, and the upsert key is lost after a join, what will the final output > >be (without internal consistency issues)? > > > >If you have non-deterministic functions like in your example the > retraction > >does not work anyhow. For a retraction to work if there is no primary key > >defined we require all columns to be deterministic: > > > https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142 > > > >> 2. To address internal consistency issues caused by emitting -U, we need > >each watermark boundary to contain paired -U/+U. That means we have to > >track what the source sends, and only emit the "watermark boundary" after > >the +U has been sent, right? For an upsert source, this is easy because we > >have ChangelogNormalize to output the -U/+U pairs. But for a retract > >source, we may need to introduce a new stateful operator. This seems > >unavoidable unless the source to output -U and +U together. > > > >Yes, this assumption does not hold for retracting sources. So far we don't > >have any such sources. I'll introduce a check that would fail for a > >combination of a retracting source and DO ERROR/DO NOTHING. > > > >> 3. About > >"table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s > >not actually called “watermark internal”, because it doesn't have > watermark > >semantics to drop late data. It’s actually a compaction barrier, right? > > > >I think I don't fully understand this point. Could you please explain it > >one more time? Are you suggesting a different name? > > > >> 4. The state in SUM is used to handle rollback under out-of-order > >scenarios. Since we resolve out-of-orderness within a watermark boundary, > >does that mean we don’t need state anymore? More clearly, we only need a > >"temporary" state that lives within each watermark boundary. (Or what I > can > >think of is: you’re using this persistent state to support the subsequent > >`ON CONFLICT conflict_action`.) > > > >Yes, I think more or less that is correct. We need the "temporary" state > >within boundary + a single result per key for a cross watermark boundary > >handover. I explain that in the FLIP. > > > >Best, > >Dawid > > > >On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote: > > > >> Thank you for the explanation. I think I understand what you mean now. I > >> have a few questions I’d like to confirm: > >> 1. Without upsert key, how do we determine the order of multiple records > >> within the same watermark boundary when non-deterministic functions are > >> involved? For example, if we receive data like the “disorder (2)” case > >> below, and the upsert key is lost after a join, what will the final > output > >> be (without internal consistency issues)? > >> > >> > >> +U(id=1, level=20, attr='b1', rand='1.5') > >> +U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly > >> modified it > >> -U(id=1, level=10, attr='a1', rand='2') > >> > >> > >> > The watermark is an internal consistency boundary. You will always > have > >> a UB for the old value and an UA for the new value. > >> 2. To address internal consistency issues caused by emitting -U, we need > >> each watermark boundary to contain paired -U/+U. That means we have to > >> track what the source sends, and only emit the "watermark boundary" > after > >> the +U has been sent, right? For an upsert source, this is easy because > we > >> have ChangelogNormalize to output the -U/+U pairs. But for a retract > >> source, we may need to introduce a new stateful operator. This seems > >> unavoidable unless the source to output -U and +U together. > >> > >> > >> 3. About > >> "table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", > it’s > >> not actually called “watermark internal”, because it doesn't have > watermark > >> semantics to drop late data. It’s actually a compaction barrier, right? > >> > >> > >> 4. The state in SUM is used to handle rollback under out-of-order > >> scenarios. Since we resolve out-of-orderness within a watermark > boundary, > >> does that mean we don’t need state anymore? More clearly, we only need a > >> "temporary" state that lives within each watermark boundary. (Or what I > can > >> think of is: you’re using this persistent state to support the > subsequent > >> `ON CONFLICT conflict_action`.) > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> 在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道: > >> >> But I’d like to add a clarification. Take the “Changelog Disorder” > >> >example described in the FLIP ( > >> > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >> ). > >> >Let’s look at disorder (2) and disorder (3). Under the default ON > CONFLICT > >> >ERROR, IIUC, the expected behavior is that Flink should fail. However, > >> >those inserts and updates actually all come from the same PK on table > s1 > >> >(id = 1). From a relational-algebra perspective, this does not violate > the > >> >PK constraint; it only happens because we shuffle by level and end up > with > >> >out-of-order issues under multi parallelisms. In other words, if we run > >> >this SQL in batch, the pk conflict will not happen and ON CONFLICT > ERROR > >> >should not fail. If the streaming job fails, users will be confused. > For > >> >disorder issues introduced by Flink internally, I believe Flink should > >> >handle them internally. > >> > > >> >Let's first clarify this point, because I think it's vital for the > >> >understanding of the proposal so we must be on the same page before we > >> talk > >> >about other points. > >> > > >> >No, the example you pointed out would not throw an error in `ON > CONFLICT > >> >ERROR`. As you pointed out yourself those come from the same PK on > table > >> >S1, therefore you will not have two active records with (id = 1) in the > >> >sink on the watermark boundary. The watermark is an internal > consistency > >> >boundary. You will always have a UB for the old value and an UA for the > >> new > >> >value. Therefore you will only ever have a single value after the > >> >compaction. We would throw only if we try to upsert into a single row > in > >> >the sink from two rows from s1 with different ids e.g. {source_id=1, > >> >sink_id=1}, {source_id=2, sink_id=1}. > >> > > >> >> Before I talk about 3, let me talk about 4 first. If I’m not > mistaken, > >> we > >> >need a deterministic boundary to determine that upstream data will no > >> >longer be updated, so that we can output the “final” result. > >> > > >> >No, that's not what I understand as internal consistency. An internal > >> >consistency is that a single change in the source produces a single > change > >> >in the sink, without incorrect intermediate states caused by > processing UB > >> >separately from UA. I don't want to process multiple source changes in > a > >> >single batch. I'd rather call it "external" consistency or snapshot > >> >processing as you are suggesting, but in my mind this is an orthogonal > >> >topic from what I am trying to solve here. > >> > > >> >On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote: > >> > > >> >> Hi, Dawid. Thank you for your detailed explanation and the update. > Let > >> me > >> >> share my thoughts. > >> >> 1. In fact, I agree with what you said: for clearly problematic > queries, > >> >> we should fail fast — for example, the case you mentioned (writing > data > >> >> from a source table whose PK is id into a sink table whose PK is > name). > >> We > >> >> can fail like a traditional database: PK conflict. That’s totally > fine. > >> >> 2. But I’d like to add a clarification. Take the “Changelog Disorder” > >> >> example described in the FLIP ( > >> >> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder > >> ). > >> >> Let’s look at disorder (2) and disorder (3). Under the default ON > >> CONFLICT > >> >> ERROR, IIUC, the expected behavior is that Flink should fail. > However, > >> >> those inserts and updates actually all come from the same PK on > table s1 > >> >> (id = 1). From a relational-algebra perspective, this does not > violate > >> the > >> >> PK constraint; it only happens because we shuffle by level and end up > >> with > >> >> out-of-order issues under multi parallelisms. In other words, if we > run > >> >> this SQL in batch, the pk conflict will not happen and ON CONFLICT > ERROR > >> >> should not fail. If the streaming job fails, users will be confused. > For > >> >> disorder issues introduced by Flink internally, I believe Flink > should > >> >> handle them internally. > >> >> 4. Before I talk about 3, let me talk about 4 first. If I’m not > >> mistaken, > >> >> we need a deterministic boundary to determine that upstream data > will no > >> >> longer be updated, so that we can output the “final” result. I think > our > >> >> disagreement is about where that “data boundary” is. In this FLIP, > the > >> >> boundary is described as: 1) watermark or 2) checkpoint. But I think > >> such a > >> >> boundary is tied to the table, for example, “the creation of a > specific > >> >> historical snapshot version of a table.” Since data in the snapshot > is > >> >> immutable, we can output results at that point. What do you think? > >> >> 3. If we must choose one of the introduced options, I lean toward > Option > >> >> 1, because we already have a clear definition for watermarks defined > on > >> a > >> >> table: “allowing for consistent results despite out-of-order or late > >> >> events” ( > >> >> > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time > >> ). > >> >> This “drop late events” semantic does not exist in checkpoint. > However, > >> my > >> >> concern is that in most scenarios, a CDC source may produce multiple > >> >> updates for the same PK over a long time span, so the watermark > should > >> be > >> >> defined very large, which will cause the job to produce no output > for a > >> >> long time. > >> >> > >> >> > >> >> > >> >> -- > >> >> > >> >> Best! > >> >> Xuyang > >> >> > >> >> > >> >> > >> >> At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> > >> wrote: > >> >> >Hey Gustavo, Xuyang > >> >> >I tried incorporating your suggestions into the FLIP. Please take > >> another > >> >> >look. > >> >> >Best, > >> >> >Dawid > >> >> > > >> >> >On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz < > [email protected] > >> > > >> >> >wrote: > >> >> > > >> >> >> 1. The default behavior changes if no ON CONFLICT is defined. I > am a > >> >> >>> little concerned that this may cause errors in a large number of > >> >> existing > >> >> >>> cases. > >> >> >> > >> >> >> I can be convinced to leave the default behaviour as it is now. I > am > >> >> >> worried though, very rarely the current behaviour of SUM is what > >> people > >> >> >> actually want. As mentioned in the FLIP I wholeheartedly believe > >> there > >> >> are > >> >> >> very little if any real world scenarios where you need the > >> deduplicate > >> >> >> behaviour. I try to elaborate a bit more in 2) > >> >> >> > >> >> >> 2. Regarding On Conflict Errors, in the context of CDC streams, > it is > >> >> >>> expected that the vast majority of cases cannot generate only one > >> >> record > >> >> >>> with one primary key. The only solutions I can think of are > >> append-only > >> >> >>> top1, deduplication, or aggregating the first row. > >> >> >> > >> >> >> I disagree with that statement. I don't think CDC streams change > >> >> anything > >> >> >> in that regard. Maybe there is some misunderstanding about what a > one > >> >> >> record means in this context. > >> >> >> > >> >> >> I agree almost certainly there will be a sequence of UA, UB for a > >> single > >> >> >> sink's primary key. > >> >> >> > >> >> >> My claim is that users almost never want a situation where they > have > >> >> more > >> >> >> than one "active" upsert key/record for one sink's primary key. I > >> tried > >> >> to > >> >> >> explain that in the FLIP, but let me try to give one more example > >> here. > >> >> >> > >> >> >> Imagine two tables: > >> >> >> CREATE TABLE source ( > >> >> >> id bigint PRIMARY KEY, > >> >> >> name string, > >> >> >> value string > >> >> >> ) > >> >> >> > >> >> >> CREATE TABLE sink ( > >> >> >> name string PRIMARY KEY, > >> >> >> value string > >> >> >> ) > >> >> >> > >> >> >> INSERT INTO sink SELECT name, value; > >> >> >> > >> >> >> === Input > >> >> >> (1, "Apple", "ABC") > >> >> >> (2, "Apple", "DEF") > >> >> >> > >> >> >> In the scenario above a SUM is inserted which will deduplicate the > >> rows > >> >> >> and override the value for "Apple" with "DEF". In my opinion it's > >> >> entirely > >> >> >> wrong, instead an exception should be thrown that there is > actually a > >> >> >> constraint validation. > >> >> >> > >> >> >> I am absolutely more than happy to be proved wrong. If you do > have a > >> >> real > >> >> >> world scenario where the deduplication logic is actually correct > and > >> >> >> expected please, please do share. So far I have not seen one, nor > >> was I > >> >> >> able to come up with one. And yet I am not suggesting to remove > the > >> >> >> deduplication logic entirely, users can still use it with ON > CONFLICT > >> >> >> DEDUPLICATE. > >> >> >> > >> >> >> 3. The special watermark generation interval affects the > visibility > >> of > >> >> >>> results. How can users configure this generation interval? > >> >> >> > >> >> >> > >> >> >> That's a fair question I'll try to elaborate on in the FLIP. I can > >> see > >> >> two > >> >> >> options: > >> >> >> 1. We piggyback on existing watermarks in the query, if there are > no > >> >> >> watermarks (tables don't have a watermark definition) we fail > during > >> >> >> planning > >> >> >> 2. We add a new parameter option for a specialized generalized > >> watermark > >> >> >> > >> >> >> Let me think for some more on that and I'll come back with a more > >> >> concrete > >> >> >> proposal. > >> >> >> > >> >> >> > >> >> >>> 4. I believe that resolving out-of-order issues and addressing > >> internal > >> >> >>> consistency are two separate problems. As I understand the > current > >> >> >>> solution, it does not really resolve the internal consistency > >> issue. > >> >> We > >> >> >>> could first resolve the out-of-order problem. For most scenarios > >> that > >> >> >>> require real-time response, we can directly output intermediate > >> results > >> >> >>> promptly. > >> >> >> > >> >> >> > >> >> >> Why doesn't it solve it? It does. Given a pair of UB/UA we won't > emit > >> >> the > >> >> >> temporary state after processing the UB. > >> >> >> > >> >> >> 5. How can we compact data with the same custom watermark? If > >> detailed > >> >> >>> comparisons are necessary, I think we still need to preserve all > key > >> >> data; > >> >> >>> we would just be compressing this data further at time t. > >> >> >> > >> >> >> Yes, we need to preserve all key data, but only between two > >> watermarks. > >> >> >> Assuming frequent watermarks, that's for a very short time. > >> >> >> > >> >> >> 6. If neither this proposed solution nor the reject solution can > >> resolve > >> >> >>> internal consistency, we need to reconsider the differences > between > >> >> the two > >> >> >>> approaches. > >> >> >> > >> >> >> I'll copy the explanation why the rejected alternative should be > >> >> rejected > >> >> >> from the FLIP: > >> >> >> > >> >> >> The solution can help us to solve the changelog disorder problem, > >> but it > >> >> >> does not help with the *internal consistency *issue. If we want to > >> fix > >> >> >> that as well, we still need the compaction on watermarks. At the > same > >> >> time > >> >> >> it increases the size of all flowing records. Therefore it was > >> rejected > >> >> in > >> >> >> favour of simply compacting all records once on the progression of > >> >> >> watermarks. > >> >> >> > >> >> >> Best, > >> >> >> Dawid > >> >> >> > >> >> > >> >
