[ 
https://issues.apache.org/jira/browse/FLINK-38568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuanfenghu updated FLINK-38568:
-------------------------------
    Description: 
h2. Background

When using MySQL CDC connector with large tables split into thousands of chunks 
(e.g., 10,000+), the BinlogSplitReader.shouldEmit() method causes severe 
performance degradation.Performance Impact (observed in production):
 * CPU usage: 25.12% spent in splitKeyRangeContains()

 * 38,403 comparisons per binlog record

 * Algorithm: O(n)  linear search through all finished snapshot splits

Root Causes:
 * Linear search: For each binlog record, the code iterates through all 
finished splits to find which split contains the record
 * Unsorted list: The finished splits list is not sorted, preventing 
optimization

h2. Solution

1. Sort splits by boundary in BinlogSplitReader.configureFilter():
{code:java}
splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); {code}
2. Replace linear search with binary search in BinlogSplitReader.shouldEmit():
{code:java}
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {   
 if (RecordUtils.splitKeyRangeContains(...)) { return true; }}
 

FinishedSnapshotSplitInfo matchedSplit =     
RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), 
chunkKey);return matchedSplit != null && 
position.isAfter(matchedSplit.getHighWatermark());{code}
h2. Performance Improvement
||Metric||Before||After||Improvement||
|Algorithm|O(n)|O(log n)|-|
|Comparisons|38,403|~16|2,400x|
|CPU usage|25.12%|<0.01%|2,500x|
|Time|34ms|<0.015ms|2,200x|

 
{code:java}
//代码占位符
`---ts=2025-10-24 18:09:07.334;thread_name=Source Data Fetcher for Source: 
MySQL Source -> Parse -> Side Output -> Case-insensitive Convert 
(1/4)#0;id=111;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@19a11164
    `---[136.386382ms] 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:shouldEmit()
        +---[0.00% 0.002533ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:isDataChangeRecord()
 #248
        +---[0.00% 0.002138ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getTableId() #249
        +---[0.01% 0.010575ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getBinlogPosition()
 #253
        +---[0.04% 0.04841ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:hasEnterPureBinlogPhase()
 #254
        +---[0.00% 6.7E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getDatabaseSchema()
 #262
        +---[0.00% 0.006651ms ] 
io.debezium.connector.mysql.MySqlDatabaseSchema:tableFor() #262
        +---[0.00% 7.69E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
 #263
        +---[0.00% 5.72E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:getChunkKeyColumns()
 #263
        +---[0.00% 6.17E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
 #264
        +---[0.00% 4.89E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:isTreatTinyInt1AsBoolean()
 #264
        +---[0.02% 0.02135ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils:getChunkKeyColumnType()
 #261
        +---[0.00% 0.002676ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getStructContainsChunkKey()
 #266
        +---[0.00% 6.1E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSchemaNameAdjuster()
 #269
        +---[0.00% 0.003123ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getSplitKey() 
#268
        +---[13.63% min=4.54E-4ms,max=0.007806ms,total=18.587447ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitStart()
 #272
        +---[13.80% min=4.55E-4ms,max=0.095948ms,total=18.823901ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitEnd()
 #272
        +---[25.12% min=7.74E-4ms,max=0.038913ms,total=34.265124ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:splitKeyRangeContains()
 #271
        +---[0.00% 9.15E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getHighWatermark()
 #273
        `---[0.03% 0.042434ms ] 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset:isAfter() #273 
{code}
 

 

  was:
h2. Background

When using MySQL CDC connector with large tables split into thousands of chunks 
(e.g., 10,000+), the BinlogSplitReader.shouldEmit() method causes severe 
performance degradation.Performance Impact (observed in production):
 * CPU usage: 25.12% spent in splitKeyRangeContains()

 * 38,403 comparisons per binlog record

 * Algorithm: O(n) linear search through all finished snapshot splits

Root Causes:
 * Linear search: For each binlog record, the code iterates through all 
finished splits to find which split contains the record
 * Unsorted list: The finished splits list is not sorted, preventing 
optimization

h2. Solution

1. Sort splits by boundary in BinlogSplitReader.configureFilter():
{code:java}
splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); {code}
2. Replace linear search with binary search in BinlogSplitReader.shouldEmit():
{code:java}
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {   
 if (RecordUtils.splitKeyRangeContains(...)) { return true; }}
 

FinishedSnapshotSplitInfo matchedSplit =     
RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), 
chunkKey);return matchedSplit != null && 
position.isAfter(matchedSplit.getHighWatermark());{code}
h2. Performance Improvement
||Metric||Before||After||Improvement||
|Algorithm|O(n)|O(log n)|-|
|Comparisons|38,403|~16|2,400x|
|CPU usage|25.12%|<0.01%|2,500x|
|Time|34ms|<0.015ms|2,200x|

 
{code:java}
//代码占位符
`---ts=2025-10-24 18:09:07.334;thread_name=Source Data Fetcher for Source: 
MySQL Source -> Parse -> Side Output -> Case-insensitive Convert 
(1/4)#0;id=111;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@19a11164
    `---[136.386382ms] 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:shouldEmit()
        +---[0.00% 0.002533ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:isDataChangeRecord()
 #248
        +---[0.00% 0.002138ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getTableId() #249
        +---[0.01% 0.010575ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getBinlogPosition()
 #253
        +---[0.04% 0.04841ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:hasEnterPureBinlogPhase()
 #254
        +---[0.00% 6.7E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getDatabaseSchema()
 #262
        +---[0.00% 0.006651ms ] 
io.debezium.connector.mysql.MySqlDatabaseSchema:tableFor() #262
        +---[0.00% 7.69E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
 #263
        +---[0.00% 5.72E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:getChunkKeyColumns()
 #263
        +---[0.00% 6.17E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
 #264
        +---[0.00% 4.89E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:isTreatTinyInt1AsBoolean()
 #264
        +---[0.02% 0.02135ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils:getChunkKeyColumnType()
 #261
        +---[0.00% 0.002676ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getStructContainsChunkKey()
 #266
        +---[0.00% 6.1E-4ms ] 
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSchemaNameAdjuster()
 #269
        +---[0.00% 0.003123ms ] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getSplitKey() 
#268
        +---[13.63% min=4.54E-4ms,max=0.007806ms,total=18.587447ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitStart()
 #272
        +---[13.80% min=4.55E-4ms,max=0.095948ms,total=18.823901ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitEnd()
 #272
        +---[25.12% min=7.74E-4ms,max=0.038913ms,total=34.265124ms,count=38403] 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:splitKeyRangeContains()
 #271
        +---[0.00% 9.15E-4ms ] 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getHighWatermark()
 #273
        `---[0.03% 0.042434ms ] 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset:isAfter() #273 
{code}
 

 


> Performance bottleneck in BinlogSplitReader with large number of snapshot 
> splits
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-38568
>                 URL: https://issues.apache.org/jira/browse/FLINK-38568
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: yuanfenghu
>            Priority: Major
>             Fix For: cdc-3.6.0
>
>
> h2. Background
> When using MySQL CDC connector with large tables split into thousands of 
> chunks (e.g., 10,000+), the BinlogSplitReader.shouldEmit() method causes 
> severe performance degradation.Performance Impact (observed in production):
>  * CPU usage: 25.12% spent in splitKeyRangeContains()
>  * 38,403 comparisons per binlog record
>  * Algorithm: O(n)  linear search through all finished snapshot splits
> Root Causes:
>  * Linear search: For each binlog record, the code iterates through all 
> finished splits to find which split contains the record
>  * Unsorted list: The finished splits list is not sorted, preventing 
> optimization
> h2. Solution
> 1. Sort splits by boundary in BinlogSplitReader.configureFilter():
> {code:java}
> splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); {code}
> 2. Replace linear search with binary search in BinlogSplitReader.shouldEmit():
> {code:java}
> for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { 
>    if (RecordUtils.splitKeyRangeContains(...)) { return true; }}
>  
> FinishedSnapshotSplitInfo matchedSplit =     
> RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), 
> chunkKey);return matchedSplit != null && 
> position.isAfter(matchedSplit.getHighWatermark());{code}
> h2. Performance Improvement
> ||Metric||Before||After||Improvement||
> |Algorithm|O(n)|O(log n)|-|
> |Comparisons|38,403|~16|2,400x|
> |CPU usage|25.12%|<0.01%|2,500x|
> |Time|34ms|<0.015ms|2,200x|
>  
> {code:java}
> //代码占位符
> `---ts=2025-10-24 18:09:07.334;thread_name=Source Data Fetcher for Source: 
> MySQL Source -> Parse -> Side Output -> Case-insensitive Convert 
> (1/4)#0;id=111;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@19a11164
>     `---[136.386382ms] 
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:shouldEmit()
>         +---[0.00% 0.002533ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:isDataChangeRecord()
>  #248
>         +---[0.00% 0.002138ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getTableId() 
> #249
>         +---[0.01% 0.010575ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getBinlogPosition()
>  #253
>         +---[0.04% 0.04841ms ] 
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:hasEnterPureBinlogPhase()
>  #254
>         +---[0.00% 6.7E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getDatabaseSchema()
>  #262
>         +---[0.00% 0.006651ms ] 
> io.debezium.connector.mysql.MySqlDatabaseSchema:tableFor() #262
>         +---[0.00% 7.69E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
>  #263
>         +---[0.00% 5.72E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:getChunkKeyColumns()
>  #263
>         +---[0.00% 6.17E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
>  #264
>         +---[0.00% 4.89E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:isTreatTinyInt1AsBoolean()
>  #264
>         +---[0.02% 0.02135ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils:getChunkKeyColumnType()
>  #261
>         +---[0.00% 0.002676ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getStructContainsChunkKey()
>  #266
>         +---[0.00% 6.1E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSchemaNameAdjuster()
>  #269
>         +---[0.00% 0.003123ms ] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getSplitKey() 
> #268
>         +---[13.63% 
> min=4.54E-4ms,max=0.007806ms,total=18.587447ms,count=38403] 
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitStart()
>  #272
>         +---[13.80% 
> min=4.55E-4ms,max=0.095948ms,total=18.823901ms,count=38403] 
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitEnd()
>  #272
>         +---[25.12% 
> min=7.74E-4ms,max=0.038913ms,total=34.265124ms,count=38403] 
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:splitKeyRangeContains()
>  #271
>         +---[0.00% 9.15E-4ms ] 
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getHighWatermark()
>  #273
>         `---[0.03% 0.042434ms ] 
> org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset:isAfter() 
> #273 {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to