This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f2fd1f31 [FLINK-38846][mysql] Optimize LinkedList processing in 
handleChange method to reduce complexity (#4250)
7f2fd1f31 is described below

commit 7f2fd1f312947b15c217dcb3ca1260e584c1bf3a
Author: Jia Fan <[email protected]>
AuthorDate: Tue Feb 3 11:12:15 2026 +0800

    [FLINK-38846][mysql] Optimize LinkedList processing in handleChange method 
to reduce complexity (#4250)
---
 .../mysql/MySqlStreamingChangeEventSource.java       | 20 +++++++++++++++-----
 .../mysql/MySqlStreamingChangeEventSource.java       | 20 +++++++++++++++-----
 2 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 4f1c70899..ceecca942 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -91,6 +91,9 @@ import static io.debezium.util.Strings.isNullOrEmpty;
  * specifying starting offset on start.
  *
  * <p>Line 1485 : Add more error details for some exceptions.
+ *
+ * <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
+ * processing LinkedList rows in handleChange method. See FLINK-38846.
  */
 public class MySqlStreamingChangeEventSource
         implements StreamingChangeEventSource<MySqlPartition, 
MySqlOffsetContext> {
@@ -941,11 +944,18 @@ public class MySqlStreamingChangeEventSource
             int count = 0;
             int numRows = rows.size();
             if (startingRowNumber < numRows) {
-                for (int row = startingRowNumber; row != numRows; ++row) {
-                    offsetContext.setRowNumber(row, numRows);
-                    offsetContext.event(tableId, eventTimestamp);
-                    changeEmitter.emit(tableId, rows.get(row));
-                    count++;
+                // Use iterator to avoid O(n²) complexity when rows is a 
LinkedList
+                // (mysql-binlog-connector-java uses LinkedList in 
WriteRowsEventDataDeserializer
+                // and DeleteRowsEventDataDeserializer)
+                int rowIndex = 0;
+                for (U rowData : rows) {
+                    if (rowIndex >= startingRowNumber) {
+                        offsetContext.setRowNumber(rowIndex, numRows);
+                        offsetContext.event(tableId, eventTimestamp);
+                        changeEmitter.emit(tableId, rowData);
+                        count++;
+                    }
+                    rowIndex++;
                 }
                 if (LOGGER.isDebugEnabled()) {
                     if (startingRowNumber != 0) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 1d0f27106..279d6fb3a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -93,6 +93,9 @@ import static io.debezium.util.Strings.isNullOrEmpty;
  * specifying starting offset on start.
  *
  * <p>Line 1485 : Add more error details for some exceptions.
+ *
+ * <p>Line 951-964 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
+ * processing LinkedList rows in handleChange method. See FLINK-38846.
  */
 public class MySqlStreamingChangeEventSource
         implements StreamingChangeEventSource<MySqlPartition, 
MySqlOffsetContext> {
@@ -946,11 +949,18 @@ public class MySqlStreamingChangeEventSource
             int count = 0;
             int numRows = rows.size();
             if (startingRowNumber < numRows) {
-                for (int row = startingRowNumber; row != numRows; ++row) {
-                    offsetContext.setRowNumber(row, numRows);
-                    offsetContext.event(tableId, eventTimestamp);
-                    changeEmitter.emit(tableId, rows.get(row));
-                    count++;
+                // Use iterator to avoid O(n²) complexity when rows is a 
LinkedList
+                // (mysql-binlog-connector-java uses LinkedList in 
WriteRowsEventDataDeserializer
+                // and DeleteRowsEventDataDeserializer)
+                int rowIndex = 0;
+                for (U rowData : rows) {
+                    if (rowIndex >= startingRowNumber) {
+                        offsetContext.setRowNumber(rowIndex, numRows);
+                        offsetContext.event(tableId, eventTimestamp);
+                        changeEmitter.emit(tableId, rowData);
+                        count++;
+                    }
+                    rowIndex++;
                 }
                 if (LOGGER.isDebugEnabled()) {
                     if (startingRowNumber != 0) {

Reply via email to