This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7f2fd1f31 [FLINK-38846][mysql] Optimize LinkedList processing in
handleChange method to reduce complexity (#4250)
7f2fd1f31 is described below
commit 7f2fd1f312947b15c217dcb3ca1260e584c1bf3a
Author: Jia Fan <[email protected]>
AuthorDate: Tue Feb 3 11:12:15 2026 +0800
[FLINK-38846][mysql] Optimize LinkedList processing in handleChange method
to reduce complexity (#4250)
---
.../mysql/MySqlStreamingChangeEventSource.java | 20 +++++++++++++++-----
.../mysql/MySqlStreamingChangeEventSource.java | 20 +++++++++++++++-----
2 files changed, 30 insertions(+), 10 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 4f1c70899..ceecca942 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -91,6 +91,9 @@ import static io.debezium.util.Strings.isNullOrEmpty;
* specifying starting offset on start.
*
* <p>Line 1485 : Add more error details for some exceptions.
+ *
+ * <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
+ * processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlPartition,
MySqlOffsetContext> {
@@ -941,11 +944,18 @@ public class MySqlStreamingChangeEventSource
int count = 0;
int numRows = rows.size();
if (startingRowNumber < numRows) {
- for (int row = startingRowNumber; row != numRows; ++row) {
- offsetContext.setRowNumber(row, numRows);
- offsetContext.event(tableId, eventTimestamp);
- changeEmitter.emit(tableId, rows.get(row));
- count++;
+ // Use iterator to avoid O(n²) complexity when rows is a
LinkedList
+ // (mysql-binlog-connector-java uses LinkedList in
WriteRowsEventDataDeserializer
+ // and DeleteRowsEventDataDeserializer)
+ int rowIndex = 0;
+ for (U rowData : rows) {
+ if (rowIndex >= startingRowNumber) {
+ offsetContext.setRowNumber(rowIndex, numRows);
+ offsetContext.event(tableId, eventTimestamp);
+ changeEmitter.emit(tableId, rowData);
+ count++;
+ }
+ rowIndex++;
}
if (LOGGER.isDebugEnabled()) {
if (startingRowNumber != 0) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 1d0f27106..279d6fb3a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -93,6 +93,9 @@ import static io.debezium.util.Strings.isNullOrEmpty;
* specifying starting offset on start.
*
* <p>Line 1485 : Add more error details for some exceptions.
+ *
+ * <p>Line 951-964 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
+ * processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlPartition,
MySqlOffsetContext> {
@@ -946,11 +949,18 @@ public class MySqlStreamingChangeEventSource
int count = 0;
int numRows = rows.size();
if (startingRowNumber < numRows) {
- for (int row = startingRowNumber; row != numRows; ++row) {
- offsetContext.setRowNumber(row, numRows);
- offsetContext.event(tableId, eventTimestamp);
- changeEmitter.emit(tableId, rows.get(row));
- count++;
+ // Use iterator to avoid O(n²) complexity when rows is a
LinkedList
+ // (mysql-binlog-connector-java uses LinkedList in
WriteRowsEventDataDeserializer
+ // and DeleteRowsEventDataDeserializer)
+ int rowIndex = 0;
+ for (U rowData : rows) {
+ if (rowIndex >= startingRowNumber) {
+ offsetContext.setRowNumber(rowIndex, numRows);
+ offsetContext.event(tableId, eventTimestamp);
+ changeEmitter.emit(tableId, rowData);
+ count++;
+ }
+ rowIndex++;
}
if (LOGGER.isDebugEnabled()) {
if (startingRowNumber != 0) {