[
https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Di Wu updated FLINK-35600:
--------------------------
Description:
Assume that the table has been split into 3 Chunks
Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read
At this time, startOffset will be set to lowwatermark
t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method
*shouldEmit* to determine whether the data is sent downstream
In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position)
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}
*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true
*Data A continues to be sent downstream, and the data is repeated*
was:
Assume that the table has been split into 3 Chunks
Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read
At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method
shouldEmit to determine whether the data is sent downstream
In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position)
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}
*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true
*Data A continues to be sent downstream, and the data is repeated*
> Data read duplication during the full-to-incremental conversion phase
> ---------------------------------------------------------------------
>
> Key: FLINK-35600
> URL: https://issues.apache.org/jira/browse/FLINK-35600
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.0
> Reporter: Di Wu
> Priority: Major
> Labels: pull-request-available
>
> Assume that the table has been split into 3 Chunks
> Timeline
> t1: chunk1 is read
> t2: a piece of data A belonging to chunk2 is inserted in MySQL
> t3: chunk2 is read, and data A has been sent downstream
> t4: chunk3 is read
> At this time, startOffset will be set to lowwatermark
> t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method
> *shouldEmit* to determine whether the data is sent downstream
> In this method
> {code:java}
> private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset
> position) {
> if (pureBinlogPhaseTables.contains(tableId)) {
> return true;
> }
> // the existed tables those have finished snapshot reading
> if (maxSplitHighWatermarkMap.containsKey(tableId)
> && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
> pureBinlogPhaseTables.add(tableId);
> return true;
> }
> } {code}
> *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data
> without ts_sec variable, and the default value is 0
> *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
> So this expression is judged as true
> *Data A continues to be sent downstream, and the data is repeated*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)