Hi, Dawid. Thank you for your detailed explanation and the update. Let me share
my thoughts.
1. In fact, I agree with what you said: for clearly problematic queries, we
should fail fast — for example, the case you mentioned (writing data from a
source table whose PK is id into a sink table whose PK is name). We can fail
like a traditional database: PK conflict. That’s totally fine.
2. But I’d like to add a clarification. Take the “Changelog Disorder” example
described in the FLIP
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder).
Let’s look at disorder (2) and disorder (3). Under the default ON CONFLICT
ERROR, IIUC, the expected behavior is that Flink should fail. However, those
inserts and updates actually all come from the same PK on table s1 (id = 1).
From a relational-algebra perspective, this does not violate the PK constraint;
it only happens because we shuffle by level and end up with out-of-order issues
under multi parallelisms. In other words, if we run this SQL in batch, the pk
conflict will not happen and ON CONFLICT ERROR should not fail. If the
streaming job fails, users will be confused. For disorder issues introduced by
Flink internally, I believe Flink should handle them internally.
4. Before I talk about 3, let me talk about 4 first. If I’m not mistaken, we
need a deterministic boundary to determine that upstream data will no longer be
updated, so that we can output the “final” result. I think our disagreement is
about where that “data boundary” is. In this FLIP, the boundary is described
as: 1) watermark or 2) checkpoint. But I think such a boundary is tied to the
table, for example, “the creation of a specific historical snapshot version of
a table.” Since data in the snapshot is immutable, we can output results at
that point. What do you think?
3. If we must choose one of the introduced options, I lean toward Option 1,
because we already have a clear definition for watermarks defined on a table:
“allowing for consistent results despite out-of-order or late events”
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time).
This “drop late events” semantic does not exist in checkpoint. However, my
concern is that in most scenarios, a CDC source may produce multiple updates
for the same PK over a long time span, so the watermark should be defined very
large, which will cause the job to produce no output for a long time.
--
Best!
Xuyang
At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]> wrote:
>Hey Gustavo, Xuyang
>I tried incorporating your suggestions into the FLIP. Please take another
>look.
>Best,
>Dawid
>
>On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <[email protected]>
>wrote:
>
>> 1. The default behavior changes if no ON CONFLICT is defined. I am a
>>> little concerned that this may cause errors in a large number of existing
>>> cases.
>>
>> I can be convinced to leave the default behaviour as it is now. I am
>> worried though, very rarely the current behaviour of SUM is what people
>> actually want. As mentioned in the FLIP I wholeheartedly believe there are
>> very little if any real world scenarios where you need the deduplicate
>> behaviour. I try to elaborate a bit more in 2)
>>
>> 2. Regarding On Conflict Errors, in the context of CDC streams, it is
>>> expected that the vast majority of cases cannot generate only one record
>>> with one primary key. The only solutions I can think of are append-only
>>> top1, deduplication, or aggregating the first row.
>>
>> I disagree with that statement. I don't think CDC streams change anything
>> in that regard. Maybe there is some misunderstanding about what a one
>> record means in this context.
>>
>> I agree almost certainly there will be a sequence of UA, UB for a single
>> sink's primary key.
>>
>> My claim is that users almost never want a situation where they have more
>> than one "active" upsert key/record for one sink's primary key. I tried to
>> explain that in the FLIP, but let me try to give one more example here.
>>
>> Imagine two tables:
>> CREATE TABLE source (
>> id bigint PRIMARY KEY,
>> name string,
>> value string
>> )
>>
>> CREATE TABLE sink (
>> name string PRIMARY KEY,
>> value string
>> )
>>
>> INSERT INTO sink SELECT name, value;
>>
>> === Input
>> (1, "Apple", "ABC")
>> (2, "Apple", "DEF")
>>
>> In the scenario above a SUM is inserted which will deduplicate the rows
>> and override the value for "Apple" with "DEF". In my opinion it's entirely
>> wrong, instead an exception should be thrown that there is actually a
>> constraint validation.
>>
>> I am absolutely more than happy to be proved wrong. If you do have a real
>> world scenario where the deduplication logic is actually correct and
>> expected please, please do share. So far I have not seen one, nor was I
>> able to come up with one. And yet I am not suggesting to remove the
>> deduplication logic entirely, users can still use it with ON CONFLICT
>> DEDUPLICATE.
>>
>> 3. The special watermark generation interval affects the visibility of
>>> results. How can users configure this generation interval?
>>
>>
>> That's a fair question I'll try to elaborate on in the FLIP. I can see two
>> options:
>> 1. We piggyback on existing watermarks in the query, if there are no
>> watermarks (tables don't have a watermark definition) we fail during
>> planning
>> 2. We add a new parameter option for a specialized generalized watermark
>>
>> Let me think for some more on that and I'll come back with a more concrete
>> proposal.
>>
>>
>>> 4. I believe that resolving out-of-order issues and addressing internal
>>> consistency are two separate problems. As I understand the current
>>> solution, it does not really resolve the internal consistency issue. We
>>> could first resolve the out-of-order problem. For most scenarios that
>>> require real-time response, we can directly output intermediate results
>>> promptly.
>>
>>
>> Why doesn't it solve it? It does. Given a pair of UB/UA we won't emit the
>> temporary state after processing the UB.
>>
>> 5. How can we compact data with the same custom watermark? If detailed
>>> comparisons are necessary, I think we still need to preserve all key data;
>>> we would just be compressing this data further at time t.
>>
>> Yes, we need to preserve all key data, but only between two watermarks.
>> Assuming frequent watermarks, that's for a very short time.
>>
>> 6. If neither this proposed solution nor the reject solution can resolve
>>> internal consistency, we need to reconsider the differences between the two
>>> approaches.
>>
>> I'll copy the explanation why the rejected alternative should be rejected
>> from the FLIP:
>>
>> The solution can help us to solve the changelog disorder problem, but it
>> does not help with the *internal consistency *issue. If we want to fix
>> that as well, we still need the compaction on watermarks. At the same time
>> it increases the size of all flowing records. Therefore it was rejected in
>> favour of simply compacting all records once on the progression of
>> watermarks.
>>
>> Best,
>> Dawid
>>