This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6bfd85d [FLINK-38568] [mysql-cdc] [cdc-base] Optimize binlog split 
lookup using binary search
7a6bfd85d is described below

commit 7a6bfd85df5fed9b2c0e0dda8e8492a682c429f6
Author: big face cat <[email protected]>
AuthorDate: Thu Nov 13 21:45:14 2025 +0800

    [FLINK-38568] [mysql-cdc] [cdc-base] Optimize binlog split lookup using 
binary search
    
    This closes  #4166.
---
 .../base/source/reader/external/FetchTask.java     |  18 ++
 .../external/IncrementalSourceStreamFetcher.java   |  29 ++-
 .../external/JdbcSourceFetchTaskContext.java       |  15 +-
 .../connectors/base/utils/SourceRecordUtils.java   |  78 -------
 .../cdc/connectors/base/utils/SplitKeyUtils.java   | 243 +++++++++++++++++++++
 .../connectors/base/utils/SplitKeyUtilsTest.java   | 132 +++++++++++
 .../mysql/debezium/reader/BinlogSplitReader.java   |  19 +-
 .../connectors/mysql/source/utils/RecordUtils.java |  82 +------
 .../mysql/source/utils/SplitKeyUtils.java          | 243 +++++++++++++++++++++
 .../mysql/source/utils/RecordUtilsTest.java        |   2 +-
 .../mysql/source/utils/SplitKeyUtilsTest.java      | 206 +++++++++++++++++
 .../reader/fetch/OracleSourceFetchTaskContext.java |  13 +-
 12 files changed, 902 insertions(+), 178 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
index fbf86b598..4dc943095 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Experimental;
 import org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 
 import io.debezium.connector.base.ChangeEventQueue;
@@ -67,6 +68,23 @@ public interface FetchTask<Split> {
 
         boolean isRecordBetween(SourceRecord record, Object[] splitStart, 
Object[] splitEnd);
 
+        /**
+         * Returns true if the implementation can provide a split key that is 
comparable with {@link
+         * FinishedSnapshotSplitInfo} boundaries, enabling binary search 
optimizations.
+         */
+        default boolean supportsSplitKeyOptimization() {
+            return false;
+        }
+
+        /**
+         * Returns the chunk key extracted from the given {@link 
SourceRecord}. Only invoked when
+         * {@link #supportsSplitKeyOptimization()} returns true.
+         */
+        default Object[] getSplitKey(SourceRecord record) {
+            throw new UnsupportedOperationException(
+                    "Split key extraction is not supported by this fetch task 
context.");
+        }
+
         void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, 
SourceRecord changeRecord);
 
         List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> 
snapshotRecords);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 754c8aa1f..72c62d600 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -67,6 +68,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     // tableId -> the max splitHighWatermark
     private Map<TableId, Offset> maxSplitHighWatermarkMap;
     private final boolean isBackfillSkipped;
+    private final boolean supportsSplitKeyOptimization;
 
     private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
 
@@ -78,6 +80,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         this.currentTaskRunning = true;
         this.pureStreamPhaseTables = new HashSet<>();
         this.isBackfillSkipped = 
taskContext.getSourceConfig().isSkipSnapshotBackfill();
+        this.supportsSplitKeyOptimization = 
taskContext.supportsSplitKeyOptimization();
     }
 
     @Override
@@ -195,13 +198,22 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                 if (isBackfillSkipped) {
                     return true;
                 }
-                for (FinishedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
-                    if (taskContext.isRecordBetween(
-                                    sourceRecord,
-                                    splitInfo.getSplitStart(),
-                                    splitInfo.getSplitEnd())
-                            && position.isAfter(splitInfo.getHighWatermark())) 
{
-                        return true;
+                List<FinishedSnapshotSplitInfo> tableSplits = 
finishedSplitsInfo.get(tableId);
+                if (supportsSplitKeyOptimization) {
+                    Object[] splitKey = taskContext.getSplitKey(sourceRecord);
+                    FinishedSnapshotSplitInfo matchedSplit =
+                            SplitKeyUtils.findSplitByKeyBinary(tableSplits, 
splitKey);
+                    return matchedSplit != null
+                            && 
position.isAfter(matchedSplit.getHighWatermark());
+                } else {
+                    for (FinishedSnapshotSplitInfo splitInfo : tableSplits) {
+                        if (taskContext.isRecordBetween(
+                                        sourceRecord,
+                                        splitInfo.getSplitStart(),
+                                        splitInfo.getSplitEnd())
+                                && 
position.isAfter(splitInfo.getHighWatermark())) {
+                            return true;
+                        }
                     }
                 }
             }
@@ -261,6 +273,9 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
                     tableIdOffsetPositionMap.put(tableId, highWatermark);
                 }
             }
+            if (supportsSplitKeyOptimization) {
+                
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
+            }
         }
         this.finishedSplitsInfo = splitsInfoMap;
         this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 3892229d4..9975331e4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
 import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import io.debezium.config.CommonConnectorConfig;
@@ -73,9 +74,19 @@ public abstract class JdbcSourceFetchTaskContext implements 
FetchTask.Context {
 
     @Override
     public boolean isRecordBetween(SourceRecord record, Object[] splitStart, 
Object[] splitEnd) {
+        Object[] key = getSplitKey(record);
+        return SplitKeyUtils.splitKeyRangeContains(key, splitStart, splitEnd);
+    }
+
+    @Override
+    public boolean supportsSplitKeyOptimization() {
+        return true;
+    }
+
+    @Override
+    public Object[] getSplitKey(SourceRecord record) {
         RowType splitKeyType = 
getSplitType(getDatabaseSchema().tableFor(this.getTableId(record)));
-        Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record, 
getSchemaNameAdjuster());
-        return SourceRecordUtils.splitKeyRangeContains(key, splitStart, 
splitEnd);
+        return SplitKeyUtils.getSplitKey(splitKeyType, record, 
getSchemaNameAdjuster());
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
index 2755e0c37..d6c38bdbf 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
@@ -17,24 +17,18 @@
 
 package org.apache.flink.cdc.connectors.base.utils;
 
-import org.apache.flink.table.types.logical.RowType;
-
 import io.debezium.data.Envelope;
 import io.debezium.document.DocumentReader;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.HistoryRecord;
-import io.debezium.util.SchemaNameAdjuster;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
 
 import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
 import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -129,78 +123,6 @@ public class SourceRecordUtils {
         return new TableId(dbName, schemaName, tableName);
     }
 
-    public static Object[] getSplitKey(
-            RowType splitBoundaryType, SourceRecord dataRecord, 
SchemaNameAdjuster nameAdjuster) {
-        // the split key field contains single field now
-        String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
-        Struct key = (Struct) dataRecord.key();
-        return new Object[] {key.get(splitFieldName)};
-    }
-
-    /** Returns the specific key contains in the split key range or not. */
-    public static boolean splitKeyRangeContains(
-            Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
-        // for all range
-        if (splitKeyStart == null && splitKeyEnd == null) {
-            return true;
-        }
-        // first split
-        if (splitKeyStart == null) {
-            int[] upperBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
-            }
-            return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
-                    && Arrays.stream(upperBoundRes).allMatch(value -> value <= 
0);
-        }
-        // last split
-        else if (splitKeyEnd == null) {
-            int[] lowerBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
-            }
-            return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
-        }
-        // other split
-        else {
-            int[] lowerBoundRes = new int[key.length];
-            int[] upperBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
-                upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
-            }
-            return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0)
-                    && (Arrays.stream(upperBoundRes).anyMatch(value -> value < 
0)
-                            && Arrays.stream(upperBoundRes).allMatch(value -> 
value <= 0));
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static int compareObjects(Object o1, Object o2) {
-        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
-            return ((Comparable) o1).compareTo(o2);
-        } else if (isNumericObject(o1) && isNumericObject(o2)) {
-            return toBigDecimal(o1).compareTo(toBigDecimal(o2));
-        } else {
-            return o1.toString().compareTo(o2.toString());
-        }
-    }
-
-    private static boolean isNumericObject(Object obj) {
-        return obj instanceof Byte
-                || obj instanceof Short
-                || obj instanceof Integer
-                || obj instanceof Long
-                || obj instanceof Float
-                || obj instanceof Double
-                || obj instanceof BigInteger
-                || obj instanceof BigDecimal;
-    }
-
-    private static BigDecimal toBigDecimal(Object numericObj) {
-        return new BigDecimal(numericObj.toString());
-    }
-
     public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) 
throws IOException {
         Struct value = (Struct) schemaRecord.value();
         String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
new file mode 100644
index 000000000..94be44aa6
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.utils;
+
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+/** Utility class to deal split keys and split key ranges. */
+public class SplitKeyUtils {
+
+    /** Returns the specific key contains in the split key range or not. */
+    public static boolean splitKeyRangeContains(
+            Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
+        return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == 
RangePosition.WITHIN;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static int compareObjects(Object o1, Object o2) {
+        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
+            return ((Comparable) o1).compareTo(o2);
+        } else if (isNumericObject(o1) && isNumericObject(o2)) {
+            return toBigDecimal(o1).compareTo(toBigDecimal(o2));
+        } else {
+            return o1.toString().compareTo(o2.toString());
+        }
+    }
+
+    private static BigDecimal toBigDecimal(Object numericObj) {
+        return new BigDecimal(numericObj.toString());
+    }
+
+    private static boolean isNumericObject(Object obj) {
+        return obj instanceof Byte
+                || obj instanceof Short
+                || obj instanceof Integer
+                || obj instanceof Long
+                || obj instanceof Float
+                || obj instanceof Double
+                || obj instanceof BigInteger
+                || obj instanceof BigDecimal;
+    }
+
+    /**
+     * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending 
order. This is
+     * required for binary search to work correctly.
+     *
+     * <p>Handles special cases: - Splits with null splitStart are considered 
as MIN value (sorted
+     * to front) - Splits with null splitEnd are considered as MAX value 
(sorted to back)
+     *
+     * <p>NOTE: Current implementation assumes single-field split keys (as 
indicated by
+     * getSplitKey()). If multi-field split keys are supported in the future, 
the comparison logic
+     * should be reviewed to ensure consistency with {@link
+     * #splitKeyRangeContains(Object[],Object[],Object[])}.
+     *
+     * @param splits List of splits to be sorted (sorted in-place)
+     */
+    public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo> 
splits) {
+        if (splits == null || splits.size() <= 1) {
+            return;
+        }
+
+        splits.sort(
+                (leftSplit, rightSplit) -> {
+                    Object[] leftSplitStart = leftSplit.getSplitStart();
+                    Object[] rightSplitStart = rightSplit.getSplitStart();
+
+                    if (leftSplitStart == null && rightSplitStart == null) {
+                        return 0;
+                    }
+                    if (leftSplitStart == null) {
+                        return -1;
+                    }
+                    if (rightSplitStart == null) {
+                        return 1;
+                    }
+
+                    return compareSplit(leftSplitStart, rightSplitStart);
+                });
+    }
+
+    /**
+     * Uses binary search to find the split containing the specified key in a 
sorted split list.
+     *
+     * <p>IMPORTANT: The splits list MUST be sorted by splitStart before 
calling this method. Use
+     * sortFinishedSplitInfos() to sort the list if needed.
+     *
+     * <p>To leverage data locality for append-heavy workloads (e.g. 
auto-increment PKs), this
+     * method checks the first and last splits before applying binary search 
to the remaining
+     * subset.
+     *
+     * @param sortedSplits List of splits sorted by splitStart (MUST be 
sorted!)
+     * @param key The chunk key to search for
+     * @return The split containing the key, or null if not found
+     */
+    public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+            List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {
+
+        if (sortedSplits == null || sortedSplits.isEmpty()) {
+            return null;
+        }
+
+        int size = sortedSplits.size();
+
+        FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0);
+        RangePosition firstPosition =
+                compareKeyWithRange(key, firstSplit.getSplitStart(), 
firstSplit.getSplitEnd());
+        if (firstPosition == RangePosition.WITHIN) {
+            return firstSplit;
+        }
+        if (firstPosition == RangePosition.BEFORE) {
+            return null;
+        }
+        if (size == 1) {
+            return null;
+        }
+
+        FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1);
+        RangePosition lastPosition =
+                compareKeyWithRange(key, lastSplit.getSplitStart(), 
lastSplit.getSplitEnd());
+        if (lastPosition == RangePosition.WITHIN) {
+            return lastSplit;
+        }
+        if (lastPosition == RangePosition.AFTER) {
+            return null;
+        }
+        if (size == 2) {
+            return null;
+        }
+
+        int left = 1;
+        int right = size - 2;
+
+        while (left <= right) {
+            int mid = left + (right - left) / 2;
+            FinishedSnapshotSplitInfo split = sortedSplits.get(mid);
+
+            RangePosition position =
+                    compareKeyWithRange(key, split.getSplitStart(), 
split.getSplitEnd());
+
+            if (position == RangePosition.WITHIN) {
+                return split;
+            } else if (position == RangePosition.BEFORE) {
+                right = mid - 1;
+            } else {
+                left = mid + 1;
+            }
+        }
+
+        return null;
+    }
+
+    /** Describes the relative position of a key to a split range. */
+    public enum RangePosition {
+        BEFORE,
+        WITHIN,
+        AFTER
+    }
+
+    /**
+     * Compares {@code key} against the half-open interval {@code [splitStart, 
splitEnd)} and
+     * returns where the key lies relative to that interval.
+     */
+    private static RangePosition compareKeyWithRange(
+            Object[] key, Object[] splitStart, Object[] splitEnd) {
+        if (splitStart == null) {
+            if (splitEnd == null) {
+                return RangePosition.WITHIN; // Full range split
+            }
+            // key < splitEnd ?
+            int cmp = compareSplit(key, splitEnd);
+            return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER;
+        }
+
+        if (splitEnd == null) {
+            // key >= splitStart ?
+            int cmp = compareSplit(key, splitStart);
+            return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE;
+        }
+
+        // Normal case: [splitStart, splitEnd)
+        int cmpStart = compareSplit(key, splitStart);
+        if (cmpStart < 0) {
+            return RangePosition.BEFORE; // key < splitStart
+        }
+
+        int cmpEnd = compareSplit(key, splitEnd);
+        if (cmpEnd >= 0) {
+            return RangePosition.AFTER; // key >= splitEnd
+        }
+
+        return RangePosition.WITHIN; // splitStart <= key < splitEnd
+    }
+
+    private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
+        // Ensure both splits have the same length
+        if (leftSplit.length != rightSplit.length) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Split key arrays must have the same length. Left: 
%d, Right: %d",
+                            leftSplit.length, rightSplit.length));
+        }
+
+        int compareResult = 0;
+        for (int i = 0; i < leftSplit.length; i++) {
+            compareResult = compareObjects(leftSplit[i], rightSplit[i]);
+            if (compareResult != 0) {
+                break;
+            }
+        }
+        return compareResult;
+    }
+
+    public static Object[] getSplitKey(
+            RowType splitBoundaryType, SourceRecord dataRecord, 
SchemaNameAdjuster nameAdjuster) {
+        // the split key field contains single field now
+        String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
+        Struct key = (Struct) dataRecord.key();
+        return new Object[] {key.get(splitFieldName)};
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
new file mode 100644
index 000000000..e35dcefcf
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.utils;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.findSplitByKeyBinary;
+import static 
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.sortFinishedSplitInfos;
+import static 
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.splitKeyRangeContains;
+
+/** Tests for {@link SplitKeyUtils}. */
+class SplitKeyUtilsTest {
+
+    @Test
+    void testSortFinishedSplitInfos() {
+        TableId tableId = TableId.parse("test_db.test_table");
+        List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+        splits.add(createSplit(tableId, "split-2", new Object[] {200L}, null));
+        splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+        splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new 
Object[] {200L}));
+
+        sortFinishedSplitInfos(splits);
+
+        Assertions.assertThat(splits)
+                .extracting(FinishedSnapshotSplitInfo::getSplitId)
+                .containsExactly("split-0", "split-1", "split-2");
+    }
+
+    @Test
+    void testFindSplitByKeyBinaryAcrossMultipleSplits() {
+        TableId tableId = TableId.parse("test_db.test_table");
+        List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+        splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+        splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new 
Object[] {200L}));
+        splits.add(createSplit(tableId, "split-2", new Object[] {200L}, new 
Object[] {300L}));
+        splits.add(createSplit(tableId, "split-3", new Object[] {300L}, null));
+
+        sortFinishedSplitInfos(splits);
+
+        Assertions.assertThat(findSplitByKeyBinary(splits, new Object[] 
{50L}).getSplitId())
+                .isEqualTo("split-0");
+        Assertions.assertThat(findSplitByKeyBinary(splits, new Object[] 
{150L}).getSplitId())
+                .isEqualTo("split-1");
+        Assertions.assertThat(findSplitByKeyBinary(splits, new Object[] 
{250L}).getSplitId())
+                .isEqualTo("split-2");
+        Assertions.assertThat(
+                        findSplitByKeyBinary(splits, new Object[] 
{Long.MAX_VALUE}).getSplitId())
+                .isEqualTo("split-3");
+    }
+
+    @Test
+    void testFindSplitByKeyBinaryEdgeCases() {
+        TableId tableId = TableId.parse("test_db.test_table");
+        List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+        splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+
+        sortFinishedSplitInfos(splits);
+
+        Assertions.assertThat(findSplitByKeyBinary(splits, new Object[] 
{50L}).getSplitId())
+                .isEqualTo("split-0");
+        Assertions.assertThat(findSplitByKeyBinary(splits, new Object[] 
{100L})).isNull();
+
+        Assertions.assertThat(findSplitByKeyBinary(new ArrayList<>(), new 
Object[] {1L})).isNull();
+        Assertions.assertThat(findSplitByKeyBinary(null, new Object[] 
{1L})).isNull();
+    }
+
+    @Test
+    void testBinarySearchConsistencyWithLinearSearch() {
+        TableId tableId = TableId.parse("test_db.test_table");
+        List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+        splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+        splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new 
Object[] {200L}));
+        splits.add(createSplit(tableId, "split-2", new Object[] {200L}, new 
Object[] {300L}));
+        splits.add(createSplit(tableId, "split-3", new Object[] {300L}, null));
+
+        sortFinishedSplitInfos(splits);
+
+        for (long key = 0; key < 400; key += 25) {
+            Object[] keyArray = new Object[] {key};
+            FinishedSnapshotSplitInfo binaryResult = 
findSplitByKeyBinary(splits, keyArray);
+
+            FinishedSnapshotSplitInfo linearResult = null;
+            for (FinishedSnapshotSplitInfo split : splits) {
+                if (splitKeyRangeContains(keyArray, split.getSplitStart(), 
split.getSplitEnd())) {
+                    linearResult = split;
+                    break;
+                }
+            }
+
+            if (binaryResult == null) {
+                Assertions.assertThat(linearResult).isNull();
+            } else {
+                Assertions.assertThat(linearResult).isNotNull();
+                Assertions.assertThat(binaryResult.getSplitId())
+                        .isEqualTo(linearResult.getSplitId());
+            }
+        }
+    }
+
+    private FinishedSnapshotSplitInfo createSplit(
+            TableId tableId, String splitId, Object[] splitStart, Object[] 
splitEnd) {
+        OffsetFactory offsetFactory = Mockito.mock(OffsetFactory.class);
+        Offset highWatermark = Mockito.mock(Offset.class);
+        return new FinishedSnapshotSplitInfo(
+                tableId, splitId, splitStart, splitEnd, highWatermark, 
offsetFactory);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index dfc639e84..77682534f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
 import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 import org.apache.flink.table.types.logical.RowType;
@@ -280,15 +281,14 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
 
                 Struct target = 
RecordUtils.getStructContainsChunkKey(sourceRecord);
                 Object[] chunkKey =
-                        RecordUtils.getSplitKey(
+                        SplitKeyUtils.getSplitKey(
                                 splitKeyType, 
statefulTaskContext.getSchemaNameAdjuster(), target);
-                for (FinishedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
-                    if (RecordUtils.splitKeyRangeContains(
-                                    chunkKey, splitInfo.getSplitStart(), 
splitInfo.getSplitEnd())
-                            && position.isAfter(splitInfo.getHighWatermark())) 
{
-                        return true;
-                    }
-                }
+
+                FinishedSnapshotSplitInfo matchedSplit =
+                        SplitKeyUtils.findSplitByKeyBinary(
+                                finishedSplitsInfo.get(tableId), chunkKey);
+
+                return matchedSplit != null && 
position.isAfter(matchedSplit.getHighWatermark());
             }
             // not in the monitored splits scope, do not emit
             return false;
@@ -349,6 +349,9 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
                     tableIdBinlogPositionMap.put(tableId, highWatermark);
                 }
             }
+            // Sort splits by splitStart for binary search optimization
+            // Binary search requires sorted data to work correctly
+            
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
         }
         this.finishedSplitsInfo = splitsInfoMap;
         this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index e6848f1c4..8ae91ad9d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -41,12 +41,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Instant;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -111,8 +108,8 @@ public class RecordUtils {
             Struct value = (Struct) binlogRecord.value();
             if (value != null) {
                 Struct chunkKeyStruct = 
getStructContainsChunkKey(binlogRecord);
-                if (splitKeyRangeContains(
-                        getSplitKey(splitBoundaryType, nameAdjuster, 
chunkKeyStruct),
+                if (SplitKeyUtils.splitKeyRangeContains(
+                        SplitKeyUtils.getSplitKey(splitBoundaryType, 
nameAdjuster, chunkKeyStruct),
                         splitStart,
                         splitEnd)) {
                     boolean hasPrimaryKey = binlogRecord.key() != null;
@@ -139,8 +136,8 @@ public class RecordUtils {
                                         binlogRecord,
                                         createReadOpValue(binlogRecord, 
Envelope.FieldName.BEFORE),
                                         true);
-                                if (!splitKeyRangeContains(
-                                        getSplitKey(
+                                if (!SplitKeyUtils.splitKeyRangeContains(
+                                        SplitKeyUtils.getSplitKey(
                                                 splitBoundaryType, 
nameAdjuster, structFromAfter),
                                         splitStart,
                                         splitEnd)) {
@@ -431,13 +428,6 @@ public class RecordUtils {
         return !StringUtils.isNullOrWhitespaceOnly(tableName);
     }
 
-    public static Object[] getSplitKey(
-            RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct 
target) {
-        // the split key field contains single field now
-        String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
-        return new Object[] {target.get(splitFieldName)};
-    }
-
     public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
         return getBinlogPosition(dataRecord.sourceOffset());
     }
@@ -451,70 +441,6 @@ public class RecordUtils {
         return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
     }
 
-    /** Returns the specific key contains in the split key range or not. */
-    public static boolean splitKeyRangeContains(
-            Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
-        // for all range
-        if (splitKeyStart == null && splitKeyEnd == null) {
-            return true;
-        }
-        // first split
-        if (splitKeyStart == null) {
-            int[] upperBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
-            }
-            return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
-                    && Arrays.stream(upperBoundRes).allMatch(value -> value <= 
0);
-        }
-        // last split
-        else if (splitKeyEnd == null) {
-            int[] lowerBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
-            }
-            return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
-        }
-        // other split
-        else {
-            int[] lowerBoundRes = new int[key.length];
-            int[] upperBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
-                upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
-            }
-            return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0)
-                    && (Arrays.stream(upperBoundRes).anyMatch(value -> value < 
0)
-                            && Arrays.stream(upperBoundRes).allMatch(value -> 
value <= 0));
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static int compareObjects(Object o1, Object o2) {
-        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
-            return ((Comparable) o1).compareTo(o2);
-        } else if (isNumericObject(o1) && isNumericObject(o2)) {
-            return toBigDecimal(o1).compareTo(toBigDecimal(o2));
-        } else {
-            return o1.toString().compareTo(o2.toString());
-        }
-    }
-
-    private static boolean isNumericObject(Object obj) {
-        return obj instanceof Byte
-                || obj instanceof Short
-                || obj instanceof Integer
-                || obj instanceof Long
-                || obj instanceof Float
-                || obj instanceof Double
-                || obj instanceof BigInteger
-                || obj instanceof BigDecimal;
-    }
-
-    private static BigDecimal toBigDecimal(Object numericObj) {
-        return new BigDecimal(numericObj.toString());
-    }
-
     public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) 
throws IOException {
         Struct value = (Struct) schemaRecord.value();
         String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
new file mode 100644
index 000000000..d19de5df5
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+/** Utility class to deal split keys and split key ranges. */
+public class SplitKeyUtils {
+
+    /** Returns the specific key contains in the split key range or not. */
+    public static boolean splitKeyRangeContains(
+            Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
+        return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == 
RangePosition.WITHIN;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static int compareObjects(Object o1, Object o2) {
+        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
+            return ((Comparable) o1).compareTo(o2);
+        } else if (isNumericObject(o1) && isNumericObject(o2)) {
+            return toBigDecimal(o1).compareTo(toBigDecimal(o2));
+        } else {
+            return o1.toString().compareTo(o2.toString());
+        }
+    }
+
+    private static boolean isNumericObject(Object obj) {
+        return obj instanceof Byte
+                || obj instanceof Short
+                || obj instanceof Integer
+                || obj instanceof Long
+                || obj instanceof Float
+                || obj instanceof Double
+                || obj instanceof BigInteger
+                || obj instanceof BigDecimal;
+    }
+
+    private static BigDecimal toBigDecimal(Object numericObj) {
+        return new BigDecimal(numericObj.toString());
+    }
+
+    public static Object[] getSplitKey(
+            RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct 
target) {
+        // the split key field contains single field now
+        String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
+        return new Object[] {target.get(splitFieldName)};
+    }
+
+    /**
+     * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending 
order. This is
+     * required for binary search to work correctly.
+     *
+     * <p>Handles special cases: - Splits with null splitStart are considered 
as MIN value (sorted
+     * to front) - Splits with null splitEnd are considered as MAX value 
(sorted to back)
+     *
+     * <p>NOTE: Current implementation assumes single-field split keys (as 
indicated by
+     * getSplitKey()). If multi-field split keys are supported in the future, 
the comparison logic
+     * should be reviewed to ensure consistency with {@link
+     * #splitKeyRangeContains(Object[],Object[],Object[])}.
+     *
+     * @param splits List of splits to be sorted (sorted in-place)
+     */
+    public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo> 
splits) {
+        if (splits == null || splits.size() <= 1) {
+            return;
+        }
+
+        splits.sort(
+                (leftSplit, rightSplit) -> {
+                    Object[] leftSplitStart = leftSplit.getSplitStart();
+                    Object[] rightSplitStart = rightSplit.getSplitStart();
+
+                    // Splits with null splitStart should come first (they are 
the first split)
+                    if (leftSplitStart == null && rightSplitStart == null) {
+                        return 0;
+                    }
+                    if (leftSplitStart == null) {
+                        return -1;
+                    }
+                    if (rightSplitStart == null) {
+                        return 1;
+                    }
+
+                    // Compare split starts
+                    return compareSplit(leftSplitStart, rightSplitStart);
+                });
+    }
+
+    /**
+     * Uses binary search to find the split containing the specified key in a 
sorted split list.
+     *
+     * <p>IMPORTANT: The splits list MUST be sorted by splitStart before 
calling this method. Use
+     * sortFinishedSplitInfos() to sort the list if needed.
+     *
+     * <p>To leverage data locality for append-heavy workloads (e.g. 
auto-increment PKs), this
+     * method checks the first and last splits before applying binary search 
to the remaining
+     * subset.
+     *
+     * @param sortedSplits List of splits sorted by splitStart (MUST be 
sorted!)
+     * @param key The chunk key to search for
+     * @return The split containing the key, or null if not found
+     */
+    public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+            List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {
+
+        if (sortedSplits == null || sortedSplits.isEmpty()) {
+            return null;
+        }
+
+        int size = sortedSplits.size();
+
+        FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0);
+        RangePosition firstPosition =
+                compareKeyWithRange(key, firstSplit.getSplitStart(), 
firstSplit.getSplitEnd());
+        if (firstPosition == RangePosition.WITHIN) {
+            return firstSplit;
+        }
+        if (firstPosition == RangePosition.BEFORE) {
+            return null;
+        }
+        if (size == 1) {
+            return null;
+        }
+
+        FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1);
+        RangePosition lastPosition =
+                compareKeyWithRange(key, lastSplit.getSplitStart(), 
lastSplit.getSplitEnd());
+        if (lastPosition == RangePosition.WITHIN) {
+            return lastSplit;
+        }
+        if (lastPosition == RangePosition.AFTER) {
+            return null;
+        }
+        if (size == 2) {
+            return null;
+        }
+
+        int left = 1;
+        int right = size - 2;
+
+        while (left <= right) {
+            int mid = left + (right - left) / 2;
+            FinishedSnapshotSplitInfo split = sortedSplits.get(mid);
+
+            RangePosition position =
+                    compareKeyWithRange(key, split.getSplitStart(), 
split.getSplitEnd());
+
+            if (position == RangePosition.WITHIN) {
+                return split;
+            } else if (position == RangePosition.BEFORE) {
+                right = mid - 1;
+            } else {
+                left = mid + 1;
+            }
+        }
+
+        return null;
+    }
+
+    /** Describes the relative position of a key to a split range. */
+    private enum RangePosition {
+        BEFORE,
+        WITHIN,
+        AFTER
+    }
+
+    /**
+     * Compares {@code key} against the half-open interval {@code [splitStart, 
splitEnd)} and
+     * returns where the key lies relative to that interval.
+     */
+    private static RangePosition compareKeyWithRange(
+            Object[] key, Object[] splitStart, Object[] splitEnd) {
+        if (splitStart == null) {
+            if (splitEnd == null) {
+                return RangePosition.WITHIN; // Full range split
+            }
+            // key < splitEnd ?
+            int cmp = compareSplit(key, splitEnd);
+            return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER;
+        }
+
+        if (splitEnd == null) {
+            // key >= splitStart ?
+            int cmp = compareSplit(key, splitStart);
+            return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE;
+        }
+
+        // Normal case: [splitStart, splitEnd)
+        int cmpStart = compareSplit(key, splitStart);
+        if (cmpStart < 0) {
+            return RangePosition.BEFORE; // key < splitStart
+        }
+
+        int cmpEnd = compareSplit(key, splitEnd);
+        if (cmpEnd >= 0) {
+            return RangePosition.AFTER; // key >= splitEnd
+        }
+
+        return RangePosition.WITHIN; // splitStart <= key < splitEnd
+    }
+
+    private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
+        // Ensure both splits have the same length
+        if (leftSplit.length != rightSplit.length) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Split key arrays must have the same length. Left: 
%d, Right: %d",
+                            leftSplit.length, rightSplit.length));
+        }
+
+        int compareResult = 0;
+        for (int i = 0; i < leftSplit.length; i++) {
+            compareResult = compareObjects(leftSplit[i], rightSplit[i]);
+            if (compareResult != 0) {
+                break;
+            }
+        }
+        return compareResult;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
index 510399489..6f248ad5f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
-import static 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains;
+import static 
org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils.splitKeyRangeContains;
 
 /** Tests for {@link 
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils}. */
 class RecordUtilsTest {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
new file mode 100644
index 000000000..2a2b0421e
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Tests for {@link SplitKeyUtils}. */
+class SplitKeyUtilsTest {
+
+    @Test
+    void testSortFinishedSplitInfos() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> emptyList = new ArrayList<>();
+        SplitKeyUtils.sortFinishedSplitInfos(emptyList);
+        Assertions.assertThat(emptyList).isEmpty();
+
+        List<FinishedSnapshotSplitInfo> singleList = new ArrayList<>();
+        singleList.add(createSplit(tableId, "split-1", new Object[] {100L}, 
new Object[] {200L}));
+        SplitKeyUtils.sortFinishedSplitInfos(singleList);
+        Assertions.assertThat(singleList).hasSize(1);
+
+        List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+        splits.add(createSplit(tableId, "split-3", new Object[] {200L}, new 
Object[] {300L}));
+        splits.add(createSplit(tableId, "split-1", null, new Object[] {100L}));
+        splits.add(createSplit(tableId, "split-4", new Object[] {300L}, null));
+        splits.add(createSplit(tableId, "split-2", new Object[] {100L}, new 
Object[] {200L}));
+
+        SplitKeyUtils.sortFinishedSplitInfos(splits);
+
+        Assertions.assertThat(splits.get(0).getSplitStart()).isNull();
+        Assertions.assertThat(splits.get(1).getSplitStart()).isEqualTo(new 
Object[] {100L});
+        Assertions.assertThat(splits.get(2).getSplitStart()).isEqualTo(new 
Object[] {200L});
+        Assertions.assertThat(splits.get(3).getSplitStart()).isEqualTo(new 
Object[] {300L});
+    }
+
+    @Test
+    void testFindSplitByKeyBinary() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+        sortedSplits.add(createSplit(tableId, "split-0", null, new Object[] 
{100L}));
+        sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L}, 
new Object[] {200L}));
+        sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L}, 
new Object[] {300L}));
+        sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L}, 
null));
+
+        FinishedSnapshotSplitInfo result =
+                SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{-1L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{100L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-1");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{150L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-1");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{200L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-2");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{250L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-2");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{300L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-3");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{1000L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-3");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{99L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+    }
+
+    @Test
+    void testFindSplitByKeyBinaryWithOnlyOneSplit() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+        sortedSplits.add(createSplit(tableId, "split-0", null, null));
+
+        FinishedSnapshotSplitInfo result =
+                SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{100L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{Long.MAX_VALUE});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+    }
+
+    @Test
+    void testFindSplitByKeyBinaryWithLargeNumberOfSplits() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+
+        for (int i = 0; i < 1000; i++) {
+            Object[] start = i == 0 ? null : new Object[] {(long) i * 10};
+            Object[] end = i == 999 ? null : new Object[] {(long) (i + 1) * 
10};
+            sortedSplits.add(createSplit(tableId, "split-" + i, start, end));
+        }
+
+        FinishedSnapshotSplitInfo result =
+                SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{5L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{505L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-50");
+
+        result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[] 
{9995L});
+        Assertions.assertThat(result).isNotNull();
+        Assertions.assertThat(result.getSplitId()).isEqualTo("split-999");
+    }
+
+    @Test
+    void testFindSplitByKeyBinaryEdgeCases() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> emptyList = new ArrayList<>();
+        FinishedSnapshotSplitInfo result =
+                SplitKeyUtils.findSplitByKeyBinary(emptyList, new Object[] 
{100L});
+        Assertions.assertThat(result).isNull();
+
+        result = SplitKeyUtils.findSplitByKeyBinary(null, new Object[] {100L});
+        Assertions.assertThat(result).isNull();
+    }
+
+    @Test
+    void testBinarySearchConsistencyWithLinearSearch() {
+        TableId tableId = new TableId("test_db", null, "test_table");
+
+        List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+        sortedSplits.add(createSplit(tableId, "split-0", null, new Object[] 
{100L}));
+        sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L}, 
new Object[] {200L}));
+        sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L}, 
new Object[] {300L}));
+        sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L}, 
new Object[] {400L}));
+        sortedSplits.add(createSplit(tableId, "split-4", new Object[] {400L}, 
null));
+
+        for (long key = 0; key < 500; key += 10) {
+            Object[] keyArray = new Object[] {key};
+
+            FinishedSnapshotSplitInfo binaryResult =
+                    SplitKeyUtils.findSplitByKeyBinary(sortedSplits, keyArray);
+
+            FinishedSnapshotSplitInfo linearResult = null;
+            for (FinishedSnapshotSplitInfo split : sortedSplits) {
+                if (SplitKeyUtils.splitKeyRangeContains(
+                        keyArray, split.getSplitStart(), split.getSplitEnd())) 
{
+                    linearResult = split;
+                    break;
+                }
+            }
+
+            if (binaryResult == null) {
+                Assertions.assertThat(linearResult)
+                        .as("Key %d should not be in any split", key)
+                        .isNull();
+            } else {
+                Assertions.assertThat(linearResult)
+                        .as("Key %d should be found by both methods", key)
+                        .isNotNull();
+                Assertions.assertThat(binaryResult.getSplitId())
+                        .as("Both methods should find the same split for key 
%d", key)
+                        .isEqualTo(linearResult.getSplitId());
+            }
+        }
+    }
+
+    private FinishedSnapshotSplitInfo createSplit(
+            TableId tableId, String splitId, Object[] splitStart, Object[] 
splitEnd) {
+        return new FinishedSnapshotSplitInfo(
+                tableId, splitId, splitStart, splitEnd, 
BinlogOffset.ofEarliest());
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index c475ada56..fda792937 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import 
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
 import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
 import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
 import 
org.apache.flink.cdc.connectors.oracle.source.handler.OracleSchemaChangeEventHandler;
 import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
@@ -221,15 +222,19 @@ public class OracleSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                 LOG.error("{} can not convert to RowId", record);
             }
             Object[] rowIds = new ROWID[] {rowId};
-            return SourceRecordUtils.splitKeyRangeContains(rowIds, splitStart, 
splitEnd);
+            return SplitKeyUtils.splitKeyRangeContains(rowIds, splitStart, 
splitEnd);
         } else {
             // config chunk key column compare
-            Object[] key =
-                    SourceRecordUtils.getSplitKey(splitKeyType, record, 
getSchemaNameAdjuster());
-            return SourceRecordUtils.splitKeyRangeContains(key, splitStart, 
splitEnd);
+            Object[] key = SplitKeyUtils.getSplitKey(splitKeyType, record, 
getSchemaNameAdjuster());
+            return SplitKeyUtils.splitKeyRangeContains(key, splitStart, 
splitEnd);
         }
     }
 
+    @Override
+    public boolean supportsSplitKeyOptimization() {
+        return false;
+    }
+
     @Override
     public JdbcSourceEventDispatcher<OraclePartition> getEventDispatcher() {
         return dispatcher;

Reply via email to