This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7a6bfd85d [FLINK-38568] [mysql-cdc] [cdc-base] Optimize binlog split
lookup using binary search
7a6bfd85d is described below
commit 7a6bfd85df5fed9b2c0e0dda8e8492a682c429f6
Author: big face cat <[email protected]>
AuthorDate: Thu Nov 13 21:45:14 2025 +0800
[FLINK-38568] [mysql-cdc] [cdc-base] Optimize binlog split lookup using
binary search
This closes #4166.
---
.../base/source/reader/external/FetchTask.java | 18 ++
.../external/IncrementalSourceStreamFetcher.java | 29 ++-
.../external/JdbcSourceFetchTaskContext.java | 15 +-
.../connectors/base/utils/SourceRecordUtils.java | 78 -------
.../cdc/connectors/base/utils/SplitKeyUtils.java | 243 +++++++++++++++++++++
.../connectors/base/utils/SplitKeyUtilsTest.java | 132 +++++++++++
.../mysql/debezium/reader/BinlogSplitReader.java | 19 +-
.../connectors/mysql/source/utils/RecordUtils.java | 82 +------
.../mysql/source/utils/SplitKeyUtils.java | 243 +++++++++++++++++++++
.../mysql/source/utils/RecordUtilsTest.java | 2 +-
.../mysql/source/utils/SplitKeyUtilsTest.java | 206 +++++++++++++++++
.../reader/fetch/OracleSourceFetchTaskContext.java | 13 +-
12 files changed, 902 insertions(+), 178 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
index fbf86b598..4dc943095 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import io.debezium.connector.base.ChangeEventQueue;
@@ -67,6 +68,23 @@ public interface FetchTask<Split> {
boolean isRecordBetween(SourceRecord record, Object[] splitStart,
Object[] splitEnd);
+ /**
+ * Returns true if the implementation can provide a split key that is
comparable with {@link
+ * FinishedSnapshotSplitInfo} boundaries, enabling binary search
optimizations.
+ */
+ default boolean supportsSplitKeyOptimization() {
+ return false;
+ }
+
+ /**
+ * Returns the chunk key extracted from the given {@link
SourceRecord}. Only invoked when
+ * {@link #supportsSplitKeyOptimization()} returns true.
+ */
+ default Object[] getSplitKey(SourceRecord record) {
+ throw new UnsupportedOperationException(
+ "Split key extraction is not supported by this fetch task
context.");
+ }
+
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer,
SourceRecord changeRecord);
List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord>
snapshotRecords);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 754c8aa1f..72c62d600 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -22,6 +22,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
import org.apache.flink.util.FlinkRuntimeException;
import
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -67,6 +68,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
// tableId -> the max splitHighWatermark
private Map<TableId, Offset> maxSplitHighWatermarkMap;
private final boolean isBackfillSkipped;
+ private final boolean supportsSplitKeyOptimization;
private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;
@@ -78,6 +80,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
this.currentTaskRunning = true;
this.pureStreamPhaseTables = new HashSet<>();
this.isBackfillSkipped =
taskContext.getSourceConfig().isSkipSnapshotBackfill();
+ this.supportsSplitKeyOptimization =
taskContext.supportsSplitKeyOptimization();
}
@Override
@@ -195,13 +198,22 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
if (isBackfillSkipped) {
return true;
}
- for (FinishedSnapshotSplitInfo splitInfo :
finishedSplitsInfo.get(tableId)) {
- if (taskContext.isRecordBetween(
- sourceRecord,
- splitInfo.getSplitStart(),
- splitInfo.getSplitEnd())
- && position.isAfter(splitInfo.getHighWatermark()))
{
- return true;
+ List<FinishedSnapshotSplitInfo> tableSplits =
finishedSplitsInfo.get(tableId);
+ if (supportsSplitKeyOptimization) {
+ Object[] splitKey = taskContext.getSplitKey(sourceRecord);
+ FinishedSnapshotSplitInfo matchedSplit =
+ SplitKeyUtils.findSplitByKeyBinary(tableSplits,
splitKey);
+ return matchedSplit != null
+ &&
position.isAfter(matchedSplit.getHighWatermark());
+ } else {
+ for (FinishedSnapshotSplitInfo splitInfo : tableSplits) {
+ if (taskContext.isRecordBetween(
+ sourceRecord,
+ splitInfo.getSplitStart(),
+ splitInfo.getSplitEnd())
+ &&
position.isAfter(splitInfo.getHighWatermark())) {
+ return true;
+ }
}
}
}
@@ -261,6 +273,9 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
tableIdOffsetPositionMap.put(tableId, highWatermark);
}
}
+ if (supportsSplitKeyOptimization) {
+
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
+ }
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdOffsetPositionMap;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
index 3892229d4..9975331e4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -23,6 +23,7 @@ import
org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.config.CommonConnectorConfig;
@@ -73,9 +74,19 @@ public abstract class JdbcSourceFetchTaskContext implements
FetchTask.Context {
@Override
public boolean isRecordBetween(SourceRecord record, Object[] splitStart,
Object[] splitEnd) {
+ Object[] key = getSplitKey(record);
+ return SplitKeyUtils.splitKeyRangeContains(key, splitStart, splitEnd);
+ }
+
+ @Override
+ public boolean supportsSplitKeyOptimization() {
+ return true;
+ }
+
+ @Override
+ public Object[] getSplitKey(SourceRecord record) {
RowType splitKeyType =
getSplitType(getDatabaseSchema().tableFor(this.getTableId(record)));
- Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record,
getSchemaNameAdjuster());
- return SourceRecordUtils.splitKeyRangeContains(key, splitStart,
splitEnd);
+ return SplitKeyUtils.getSplitKey(splitKeyType, record,
getSchemaNameAdjuster());
}
@Override
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
index 2755e0c37..d6c38bdbf 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java
@@ -17,24 +17,18 @@
package org.apache.flink.cdc.connectors.base.utils;
-import org.apache.flink.table.types.logical.RowType;
-
import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
-import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Arrays;
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -129,78 +123,6 @@ public class SourceRecordUtils {
return new TableId(dbName, schemaName, tableName);
}
- public static Object[] getSplitKey(
- RowType splitBoundaryType, SourceRecord dataRecord,
SchemaNameAdjuster nameAdjuster) {
- // the split key field contains single field now
- String splitFieldName =
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
- Struct key = (Struct) dataRecord.key();
- return new Object[] {key.get(splitFieldName)};
- }
-
- /** Returns the specific key contains in the split key range or not. */
- public static boolean splitKeyRangeContains(
- Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
- // for all range
- if (splitKeyStart == null && splitKeyEnd == null) {
- return true;
- }
- // first split
- if (splitKeyStart == null) {
- int[] upperBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
- }
- return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
- && Arrays.stream(upperBoundRes).allMatch(value -> value <=
0);
- }
- // last split
- else if (splitKeyEnd == null) {
- int[] lowerBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
- }
- return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
- }
- // other split
- else {
- int[] lowerBoundRes = new int[key.length];
- int[] upperBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
- upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
- }
- return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0)
- && (Arrays.stream(upperBoundRes).anyMatch(value -> value <
0)
- && Arrays.stream(upperBoundRes).allMatch(value ->
value <= 0));
- }
- }
-
- @SuppressWarnings("unchecked")
- private static int compareObjects(Object o1, Object o2) {
- if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
- return ((Comparable) o1).compareTo(o2);
- } else if (isNumericObject(o1) && isNumericObject(o2)) {
- return toBigDecimal(o1).compareTo(toBigDecimal(o2));
- } else {
- return o1.toString().compareTo(o2.toString());
- }
- }
-
- private static boolean isNumericObject(Object obj) {
- return obj instanceof Byte
- || obj instanceof Short
- || obj instanceof Integer
- || obj instanceof Long
- || obj instanceof Float
- || obj instanceof Double
- || obj instanceof BigInteger
- || obj instanceof BigDecimal;
- }
-
- private static BigDecimal toBigDecimal(Object numericObj) {
- return new BigDecimal(numericObj.toString());
- }
-
public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord)
throws IOException {
Struct value = (Struct) schemaRecord.value();
String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
new file mode 100644
index 000000000..94be44aa6
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.utils;
+
+import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+/** Utility class to deal split keys and split key ranges. */
+public class SplitKeyUtils {
+
+ /** Returns the specific key contains in the split key range or not. */
+ public static boolean splitKeyRangeContains(
+ Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
+ return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) ==
RangePosition.WITHIN;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static int compareObjects(Object o1, Object o2) {
+ if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
+ return ((Comparable) o1).compareTo(o2);
+ } else if (isNumericObject(o1) && isNumericObject(o2)) {
+ return toBigDecimal(o1).compareTo(toBigDecimal(o2));
+ } else {
+ return o1.toString().compareTo(o2.toString());
+ }
+ }
+
+ private static BigDecimal toBigDecimal(Object numericObj) {
+ return new BigDecimal(numericObj.toString());
+ }
+
+ private static boolean isNumericObject(Object obj) {
+ return obj instanceof Byte
+ || obj instanceof Short
+ || obj instanceof Integer
+ || obj instanceof Long
+ || obj instanceof Float
+ || obj instanceof Double
+ || obj instanceof BigInteger
+ || obj instanceof BigDecimal;
+ }
+
+ /**
+ * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending
order. This is
+ * required for binary search to work correctly.
+ *
+ * <p>Handles special cases: - Splits with null splitStart are considered
as MIN value (sorted
+ * to front) - Splits with null splitEnd are considered as MAX value
(sorted to back)
+ *
+ * <p>NOTE: Current implementation assumes single-field split keys (as
indicated by
+ * getSplitKey()). If multi-field split keys are supported in the future,
the comparison logic
+ * should be reviewed to ensure consistency with {@link
+ * #splitKeyRangeContains(Object[],Object[],Object[])}.
+ *
+ * @param splits List of splits to be sorted (sorted in-place)
+ */
+ public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo>
splits) {
+ if (splits == null || splits.size() <= 1) {
+ return;
+ }
+
+ splits.sort(
+ (leftSplit, rightSplit) -> {
+ Object[] leftSplitStart = leftSplit.getSplitStart();
+ Object[] rightSplitStart = rightSplit.getSplitStart();
+
+ if (leftSplitStart == null && rightSplitStart == null) {
+ return 0;
+ }
+ if (leftSplitStart == null) {
+ return -1;
+ }
+ if (rightSplitStart == null) {
+ return 1;
+ }
+
+ return compareSplit(leftSplitStart, rightSplitStart);
+ });
+ }
+
+ /**
+ * Uses binary search to find the split containing the specified key in a
sorted split list.
+ *
+ * <p>IMPORTANT: The splits list MUST be sorted by splitStart before
calling this method. Use
+ * sortFinishedSplitInfos() to sort the list if needed.
+ *
+ * <p>To leverage data locality for append-heavy workloads (e.g.
auto-increment PKs), this
+ * method checks the first and last splits before applying binary search
to the remaining
+ * subset.
+ *
+ * @param sortedSplits List of splits sorted by splitStart (MUST be
sorted!)
+ * @param key The chunk key to search for
+ * @return The split containing the key, or null if not found
+ */
+ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+ List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {
+
+ if (sortedSplits == null || sortedSplits.isEmpty()) {
+ return null;
+ }
+
+ int size = sortedSplits.size();
+
+ FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0);
+ RangePosition firstPosition =
+ compareKeyWithRange(key, firstSplit.getSplitStart(),
firstSplit.getSplitEnd());
+ if (firstPosition == RangePosition.WITHIN) {
+ return firstSplit;
+ }
+ if (firstPosition == RangePosition.BEFORE) {
+ return null;
+ }
+ if (size == 1) {
+ return null;
+ }
+
+ FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1);
+ RangePosition lastPosition =
+ compareKeyWithRange(key, lastSplit.getSplitStart(),
lastSplit.getSplitEnd());
+ if (lastPosition == RangePosition.WITHIN) {
+ return lastSplit;
+ }
+ if (lastPosition == RangePosition.AFTER) {
+ return null;
+ }
+ if (size == 2) {
+ return null;
+ }
+
+ int left = 1;
+ int right = size - 2;
+
+ while (left <= right) {
+ int mid = left + (right - left) / 2;
+ FinishedSnapshotSplitInfo split = sortedSplits.get(mid);
+
+ RangePosition position =
+ compareKeyWithRange(key, split.getSplitStart(),
split.getSplitEnd());
+
+ if (position == RangePosition.WITHIN) {
+ return split;
+ } else if (position == RangePosition.BEFORE) {
+ right = mid - 1;
+ } else {
+ left = mid + 1;
+ }
+ }
+
+ return null;
+ }
+
+ /** Describes the relative position of a key to a split range. */
+ public enum RangePosition {
+ BEFORE,
+ WITHIN,
+ AFTER
+ }
+
+ /**
+ * Compares {@code key} against the half-open interval {@code [splitStart,
splitEnd)} and
+ * returns where the key lies relative to that interval.
+ */
+ private static RangePosition compareKeyWithRange(
+ Object[] key, Object[] splitStart, Object[] splitEnd) {
+ if (splitStart == null) {
+ if (splitEnd == null) {
+ return RangePosition.WITHIN; // Full range split
+ }
+ // key < splitEnd ?
+ int cmp = compareSplit(key, splitEnd);
+ return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER;
+ }
+
+ if (splitEnd == null) {
+ // key >= splitStart ?
+ int cmp = compareSplit(key, splitStart);
+ return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE;
+ }
+
+ // Normal case: [splitStart, splitEnd)
+ int cmpStart = compareSplit(key, splitStart);
+ if (cmpStart < 0) {
+ return RangePosition.BEFORE; // key < splitStart
+ }
+
+ int cmpEnd = compareSplit(key, splitEnd);
+ if (cmpEnd >= 0) {
+ return RangePosition.AFTER; // key >= splitEnd
+ }
+
+ return RangePosition.WITHIN; // splitStart <= key < splitEnd
+ }
+
+ private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
+ // Ensure both splits have the same length
+ if (leftSplit.length != rightSplit.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Split key arrays must have the same length. Left:
%d, Right: %d",
+ leftSplit.length, rightSplit.length));
+ }
+
+ int compareResult = 0;
+ for (int i = 0; i < leftSplit.length; i++) {
+ compareResult = compareObjects(leftSplit[i], rightSplit[i]);
+ if (compareResult != 0) {
+ break;
+ }
+ }
+ return compareResult;
+ }
+
+ public static Object[] getSplitKey(
+ RowType splitBoundaryType, SourceRecord dataRecord,
SchemaNameAdjuster nameAdjuster) {
+ // the split key field contains single field now
+ String splitFieldName =
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
+ Struct key = (Struct) dataRecord.key();
+ return new Object[] {key.get(splitFieldName)};
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
new file mode 100644
index 000000000..e35dcefcf
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtilsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.base.utils;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.findSplitByKeyBinary;
+import static
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.sortFinishedSplitInfos;
+import static
org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils.splitKeyRangeContains;
+
+/** Tests for {@link SplitKeyUtils}. */
+class SplitKeyUtilsTest {
+
+ @Test
+ void testSortFinishedSplitInfos() {
+ TableId tableId = TableId.parse("test_db.test_table");
+ List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+ splits.add(createSplit(tableId, "split-2", new Object[] {200L}, null));
+ splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+ splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new
Object[] {200L}));
+
+ sortFinishedSplitInfos(splits);
+
+ Assertions.assertThat(splits)
+ .extracting(FinishedSnapshotSplitInfo::getSplitId)
+ .containsExactly("split-0", "split-1", "split-2");
+ }
+
+ @Test
+ void testFindSplitByKeyBinaryAcrossMultipleSplits() {
+ TableId tableId = TableId.parse("test_db.test_table");
+ List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+ splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+ splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new
Object[] {200L}));
+ splits.add(createSplit(tableId, "split-2", new Object[] {200L}, new
Object[] {300L}));
+ splits.add(createSplit(tableId, "split-3", new Object[] {300L}, null));
+
+ sortFinishedSplitInfos(splits);
+
+ Assertions.assertThat(findSplitByKeyBinary(splits, new Object[]
{50L}).getSplitId())
+ .isEqualTo("split-0");
+ Assertions.assertThat(findSplitByKeyBinary(splits, new Object[]
{150L}).getSplitId())
+ .isEqualTo("split-1");
+ Assertions.assertThat(findSplitByKeyBinary(splits, new Object[]
{250L}).getSplitId())
+ .isEqualTo("split-2");
+ Assertions.assertThat(
+ findSplitByKeyBinary(splits, new Object[]
{Long.MAX_VALUE}).getSplitId())
+ .isEqualTo("split-3");
+ }
+
+ @Test
+ void testFindSplitByKeyBinaryEdgeCases() {
+ TableId tableId = TableId.parse("test_db.test_table");
+ List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+ splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+
+ sortFinishedSplitInfos(splits);
+
+ Assertions.assertThat(findSplitByKeyBinary(splits, new Object[]
{50L}).getSplitId())
+ .isEqualTo("split-0");
+ Assertions.assertThat(findSplitByKeyBinary(splits, new Object[]
{100L})).isNull();
+
+ Assertions.assertThat(findSplitByKeyBinary(new ArrayList<>(), new
Object[] {1L})).isNull();
+ Assertions.assertThat(findSplitByKeyBinary(null, new Object[]
{1L})).isNull();
+ }
+
+ @Test
+ void testBinarySearchConsistencyWithLinearSearch() {
+ TableId tableId = TableId.parse("test_db.test_table");
+ List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+ splits.add(createSplit(tableId, "split-0", null, new Object[] {100L}));
+ splits.add(createSplit(tableId, "split-1", new Object[] {100L}, new
Object[] {200L}));
+ splits.add(createSplit(tableId, "split-2", new Object[] {200L}, new
Object[] {300L}));
+ splits.add(createSplit(tableId, "split-3", new Object[] {300L}, null));
+
+ sortFinishedSplitInfos(splits);
+
+ for (long key = 0; key < 400; key += 25) {
+ Object[] keyArray = new Object[] {key};
+ FinishedSnapshotSplitInfo binaryResult =
findSplitByKeyBinary(splits, keyArray);
+
+ FinishedSnapshotSplitInfo linearResult = null;
+ for (FinishedSnapshotSplitInfo split : splits) {
+ if (splitKeyRangeContains(keyArray, split.getSplitStart(),
split.getSplitEnd())) {
+ linearResult = split;
+ break;
+ }
+ }
+
+ if (binaryResult == null) {
+ Assertions.assertThat(linearResult).isNull();
+ } else {
+ Assertions.assertThat(linearResult).isNotNull();
+ Assertions.assertThat(binaryResult.getSplitId())
+ .isEqualTo(linearResult.getSplitId());
+ }
+ }
+ }
+
+ private FinishedSnapshotSplitInfo createSplit(
+ TableId tableId, String splitId, Object[] splitStart, Object[]
splitEnd) {
+ OffsetFactory offsetFactory = Mockito.mock(OffsetFactory.class);
+ Offset highWatermark = Mockito.mock(Offset.class);
+ return new FinishedSnapshotSplitInfo(
+ tableId, splitId, splitStart, splitEnd, highWatermark,
offsetFactory);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index dfc639e84..77682534f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -28,6 +28,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
+import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.table.types.logical.RowType;
@@ -280,15 +281,14 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
Struct target =
RecordUtils.getStructContainsChunkKey(sourceRecord);
Object[] chunkKey =
- RecordUtils.getSplitKey(
+ SplitKeyUtils.getSplitKey(
splitKeyType,
statefulTaskContext.getSchemaNameAdjuster(), target);
- for (FinishedSnapshotSplitInfo splitInfo :
finishedSplitsInfo.get(tableId)) {
- if (RecordUtils.splitKeyRangeContains(
- chunkKey, splitInfo.getSplitStart(),
splitInfo.getSplitEnd())
- && position.isAfter(splitInfo.getHighWatermark()))
{
- return true;
- }
- }
+
+ FinishedSnapshotSplitInfo matchedSplit =
+ SplitKeyUtils.findSplitByKeyBinary(
+ finishedSplitsInfo.get(tableId), chunkKey);
+
+ return matchedSplit != null &&
position.isAfter(matchedSplit.getHighWatermark());
}
// not in the monitored splits scope, do not emit
return false;
@@ -349,6 +349,9 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
tableIdBinlogPositionMap.put(tableId, highWatermark);
}
}
+ // Sort splits by splitStart for binary search optimization
+ // Binary search requires sorted data to work correctly
+
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index e6848f1c4..8ae91ad9d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -41,12 +41,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -111,8 +108,8 @@ public class RecordUtils {
Struct value = (Struct) binlogRecord.value();
if (value != null) {
Struct chunkKeyStruct =
getStructContainsChunkKey(binlogRecord);
- if (splitKeyRangeContains(
- getSplitKey(splitBoundaryType, nameAdjuster,
chunkKeyStruct),
+ if (SplitKeyUtils.splitKeyRangeContains(
+ SplitKeyUtils.getSplitKey(splitBoundaryType,
nameAdjuster, chunkKeyStruct),
splitStart,
splitEnd)) {
boolean hasPrimaryKey = binlogRecord.key() != null;
@@ -139,8 +136,8 @@ public class RecordUtils {
binlogRecord,
createReadOpValue(binlogRecord,
Envelope.FieldName.BEFORE),
true);
- if (!splitKeyRangeContains(
- getSplitKey(
+ if (!SplitKeyUtils.splitKeyRangeContains(
+ SplitKeyUtils.getSplitKey(
splitBoundaryType,
nameAdjuster, structFromAfter),
splitStart,
splitEnd)) {
@@ -431,13 +428,6 @@ public class RecordUtils {
return !StringUtils.isNullOrWhitespaceOnly(tableName);
}
- public static Object[] getSplitKey(
- RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct
target) {
- // the split key field contains single field now
- String splitFieldName =
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
- return new Object[] {target.get(splitFieldName)};
- }
-
public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
return getBinlogPosition(dataRecord.sourceOffset());
}
@@ -451,70 +441,6 @@ public class RecordUtils {
return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
}
- /** Returns the specific key contains in the split key range or not. */
- public static boolean splitKeyRangeContains(
- Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
- // for all range
- if (splitKeyStart == null && splitKeyEnd == null) {
- return true;
- }
- // first split
- if (splitKeyStart == null) {
- int[] upperBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
- }
- return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
- && Arrays.stream(upperBoundRes).allMatch(value -> value <=
0);
- }
- // last split
- else if (splitKeyEnd == null) {
- int[] lowerBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
- }
- return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
- }
- // other split
- else {
- int[] lowerBoundRes = new int[key.length];
- int[] upperBoundRes = new int[key.length];
- for (int i = 0; i < key.length; i++) {
- lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
- upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
- }
- return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0)
- && (Arrays.stream(upperBoundRes).anyMatch(value -> value <
0)
- && Arrays.stream(upperBoundRes).allMatch(value ->
value <= 0));
- }
- }
-
- @SuppressWarnings("unchecked")
- private static int compareObjects(Object o1, Object o2) {
- if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
- return ((Comparable) o1).compareTo(o2);
- } else if (isNumericObject(o1) && isNumericObject(o2)) {
- return toBigDecimal(o1).compareTo(toBigDecimal(o2));
- } else {
- return o1.toString().compareTo(o2.toString());
- }
- }
-
- private static boolean isNumericObject(Object obj) {
- return obj instanceof Byte
- || obj instanceof Short
- || obj instanceof Integer
- || obj instanceof Long
- || obj instanceof Float
- || obj instanceof Double
- || obj instanceof BigInteger
- || obj instanceof BigDecimal;
- }
-
- private static BigDecimal toBigDecimal(Object numericObj) {
- return new BigDecimal(numericObj.toString());
- }
-
public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord)
throws IOException {
Struct value = (Struct) schemaRecord.value();
String historyRecordStr = value.getString(HISTORY_RECORD_FIELD);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
new file mode 100644
index 000000000..d19de5df5
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+/** Utility class to deal split keys and split key ranges. */
+public class SplitKeyUtils {
+
+ /** Returns the specific key contains in the split key range or not. */
+ public static boolean splitKeyRangeContains(
+ Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
+ return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) ==
RangePosition.WITHIN;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static int compareObjects(Object o1, Object o2) {
+ if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
+ return ((Comparable) o1).compareTo(o2);
+ } else if (isNumericObject(o1) && isNumericObject(o2)) {
+ return toBigDecimal(o1).compareTo(toBigDecimal(o2));
+ } else {
+ return o1.toString().compareTo(o2.toString());
+ }
+ }
+
+ private static boolean isNumericObject(Object obj) {
+ return obj instanceof Byte
+ || obj instanceof Short
+ || obj instanceof Integer
+ || obj instanceof Long
+ || obj instanceof Float
+ || obj instanceof Double
+ || obj instanceof BigInteger
+ || obj instanceof BigDecimal;
+ }
+
+ private static BigDecimal toBigDecimal(Object numericObj) {
+ return new BigDecimal(numericObj.toString());
+ }
+
+ public static Object[] getSplitKey(
+ RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct
target) {
+ // the split key field contains single field now
+ String splitFieldName =
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
+ return new Object[] {target.get(splitFieldName)};
+ }
+
+ /**
+ * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending
order. This is
+ * required for binary search to work correctly.
+ *
+ * <p>Handles special cases: - Splits with null splitStart are considered
as MIN value (sorted
+ * to front) - Splits with null splitEnd are considered as MAX value
(sorted to back)
+ *
+ * <p>NOTE: Current implementation assumes single-field split keys (as
indicated by
+ * getSplitKey()). If multi-field split keys are supported in the future,
the comparison logic
+ * should be reviewed to ensure consistency with {@link
+ * #splitKeyRangeContains(Object[],Object[],Object[])}.
+ *
+ * @param splits List of splits to be sorted (sorted in-place)
+ */
+ public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo>
splits) {
+ if (splits == null || splits.size() <= 1) {
+ return;
+ }
+
+ splits.sort(
+ (leftSplit, rightSplit) -> {
+ Object[] leftSplitStart = leftSplit.getSplitStart();
+ Object[] rightSplitStart = rightSplit.getSplitStart();
+
+ // Splits with null splitStart should come first (they are
the first split)
+ if (leftSplitStart == null && rightSplitStart == null) {
+ return 0;
+ }
+ if (leftSplitStart == null) {
+ return -1;
+ }
+ if (rightSplitStart == null) {
+ return 1;
+ }
+
+ // Compare split starts
+ return compareSplit(leftSplitStart, rightSplitStart);
+ });
+ }
+
+ /**
+ * Uses binary search to find the split containing the specified key in a
sorted split list.
+ *
+ * <p>IMPORTANT: The splits list MUST be sorted by splitStart before
calling this method. Use
+ * sortFinishedSplitInfos() to sort the list if needed.
+ *
+ * <p>To leverage data locality for append-heavy workloads (e.g.
auto-increment PKs), this
+ * method checks the first and last splits before applying binary search
to the remaining
+ * subset.
+ *
+ * @param sortedSplits List of splits sorted by splitStart (MUST be
sorted!)
+ * @param key The chunk key to search for
+ * @return The split containing the key, or null if not found
+ */
+ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+ List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {
+
+ if (sortedSplits == null || sortedSplits.isEmpty()) {
+ return null;
+ }
+
+ int size = sortedSplits.size();
+
+ FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0);
+ RangePosition firstPosition =
+ compareKeyWithRange(key, firstSplit.getSplitStart(),
firstSplit.getSplitEnd());
+ if (firstPosition == RangePosition.WITHIN) {
+ return firstSplit;
+ }
+ if (firstPosition == RangePosition.BEFORE) {
+ return null;
+ }
+ if (size == 1) {
+ return null;
+ }
+
+ FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1);
+ RangePosition lastPosition =
+ compareKeyWithRange(key, lastSplit.getSplitStart(),
lastSplit.getSplitEnd());
+ if (lastPosition == RangePosition.WITHIN) {
+ return lastSplit;
+ }
+ if (lastPosition == RangePosition.AFTER) {
+ return null;
+ }
+ if (size == 2) {
+ return null;
+ }
+
+ int left = 1;
+ int right = size - 2;
+
+ while (left <= right) {
+ int mid = left + (right - left) / 2;
+ FinishedSnapshotSplitInfo split = sortedSplits.get(mid);
+
+ RangePosition position =
+ compareKeyWithRange(key, split.getSplitStart(),
split.getSplitEnd());
+
+ if (position == RangePosition.WITHIN) {
+ return split;
+ } else if (position == RangePosition.BEFORE) {
+ right = mid - 1;
+ } else {
+ left = mid + 1;
+ }
+ }
+
+ return null;
+ }
+
+ /** Describes the relative position of a key to a split range. */
+ private enum RangePosition {
+ BEFORE,
+ WITHIN,
+ AFTER
+ }
+
+ /**
+ * Compares {@code key} against the half-open interval {@code [splitStart,
splitEnd)} and
+ * returns where the key lies relative to that interval.
+ */
+ private static RangePosition compareKeyWithRange(
+ Object[] key, Object[] splitStart, Object[] splitEnd) {
+ if (splitStart == null) {
+ if (splitEnd == null) {
+ return RangePosition.WITHIN; // Full range split
+ }
+ // key < splitEnd ?
+ int cmp = compareSplit(key, splitEnd);
+ return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER;
+ }
+
+ if (splitEnd == null) {
+ // key >= splitStart ?
+ int cmp = compareSplit(key, splitStart);
+ return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE;
+ }
+
+ // Normal case: [splitStart, splitEnd)
+ int cmpStart = compareSplit(key, splitStart);
+ if (cmpStart < 0) {
+ return RangePosition.BEFORE; // key < splitStart
+ }
+
+ int cmpEnd = compareSplit(key, splitEnd);
+ if (cmpEnd >= 0) {
+ return RangePosition.AFTER; // key >= splitEnd
+ }
+
+ return RangePosition.WITHIN; // splitStart <= key < splitEnd
+ }
+
+ private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
+ // Ensure both splits have the same length
+ if (leftSplit.length != rightSplit.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Split key arrays must have the same length. Left:
%d, Right: %d",
+ leftSplit.length, rightSplit.length));
+ }
+
+ int compareResult = 0;
+ for (int i = 0; i < leftSplit.length; i++) {
+ compareResult = compareObjects(leftSplit[i], rightSplit[i]);
+ if (compareResult != 0) {
+ break;
+ }
+ }
+ return compareResult;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
index 510399489..6f248ad5f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
-import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains;
+import static
org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils.splitKeyRangeContains;
/** Tests for {@link
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils}. */
class RecordUtilsTest {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
new file mode 100644
index 000000000..2a2b0421e
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtilsTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Tests for {@link SplitKeyUtils}. */
+class SplitKeyUtilsTest {
+
+ @Test
+ void testSortFinishedSplitInfos() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> emptyList = new ArrayList<>();
+ SplitKeyUtils.sortFinishedSplitInfos(emptyList);
+ Assertions.assertThat(emptyList).isEmpty();
+
+ List<FinishedSnapshotSplitInfo> singleList = new ArrayList<>();
+ singleList.add(createSplit(tableId, "split-1", new Object[] {100L},
new Object[] {200L}));
+ SplitKeyUtils.sortFinishedSplitInfos(singleList);
+ Assertions.assertThat(singleList).hasSize(1);
+
+ List<FinishedSnapshotSplitInfo> splits = new ArrayList<>();
+ splits.add(createSplit(tableId, "split-3", new Object[] {200L}, new
Object[] {300L}));
+ splits.add(createSplit(tableId, "split-1", null, new Object[] {100L}));
+ splits.add(createSplit(tableId, "split-4", new Object[] {300L}, null));
+ splits.add(createSplit(tableId, "split-2", new Object[] {100L}, new
Object[] {200L}));
+
+ SplitKeyUtils.sortFinishedSplitInfos(splits);
+
+ Assertions.assertThat(splits.get(0).getSplitStart()).isNull();
+ Assertions.assertThat(splits.get(1).getSplitStart()).isEqualTo(new
Object[] {100L});
+ Assertions.assertThat(splits.get(2).getSplitStart()).isEqualTo(new
Object[] {200L});
+ Assertions.assertThat(splits.get(3).getSplitStart()).isEqualTo(new
Object[] {300L});
+ }
+
+ @Test
+ void testFindSplitByKeyBinary() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+ sortedSplits.add(createSplit(tableId, "split-0", null, new Object[]
{100L}));
+ sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L},
new Object[] {200L}));
+ sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L},
new Object[] {300L}));
+ sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L},
null));
+
+ FinishedSnapshotSplitInfo result =
+ SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{-1L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{100L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-1");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{150L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-1");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{200L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-2");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{250L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-2");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{300L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-3");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{1000L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-3");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{99L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+ }
+
+ @Test
+ void testFindSplitByKeyBinaryWithOnlyOneSplit() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+ sortedSplits.add(createSplit(tableId, "split-0", null, null));
+
+ FinishedSnapshotSplitInfo result =
+ SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{100L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{Long.MAX_VALUE});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+ }
+
+ @Test
+ void testFindSplitByKeyBinaryWithLargeNumberOfSplits() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+
+ for (int i = 0; i < 1000; i++) {
+ Object[] start = i == 0 ? null : new Object[] {(long) i * 10};
+ Object[] end = i == 999 ? null : new Object[] {(long) (i + 1) *
10};
+ sortedSplits.add(createSplit(tableId, "split-" + i, start, end));
+ }
+
+ FinishedSnapshotSplitInfo result =
+ SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{5L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-0");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{505L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-50");
+
+ result = SplitKeyUtils.findSplitByKeyBinary(sortedSplits, new Object[]
{9995L});
+ Assertions.assertThat(result).isNotNull();
+ Assertions.assertThat(result.getSplitId()).isEqualTo("split-999");
+ }
+
+ @Test
+ void testFindSplitByKeyBinaryEdgeCases() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> emptyList = new ArrayList<>();
+ FinishedSnapshotSplitInfo result =
+ SplitKeyUtils.findSplitByKeyBinary(emptyList, new Object[]
{100L});
+ Assertions.assertThat(result).isNull();
+
+ result = SplitKeyUtils.findSplitByKeyBinary(null, new Object[] {100L});
+ Assertions.assertThat(result).isNull();
+ }
+
+ @Test
+ void testBinarySearchConsistencyWithLinearSearch() {
+ TableId tableId = new TableId("test_db", null, "test_table");
+
+ List<FinishedSnapshotSplitInfo> sortedSplits = new ArrayList<>();
+ sortedSplits.add(createSplit(tableId, "split-0", null, new Object[]
{100L}));
+ sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L},
new Object[] {200L}));
+ sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L},
new Object[] {300L}));
+ sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L},
new Object[] {400L}));
+ sortedSplits.add(createSplit(tableId, "split-4", new Object[] {400L},
null));
+
+ for (long key = 0; key < 500; key += 10) {
+ Object[] keyArray = new Object[] {key};
+
+ FinishedSnapshotSplitInfo binaryResult =
+ SplitKeyUtils.findSplitByKeyBinary(sortedSplits, keyArray);
+
+ FinishedSnapshotSplitInfo linearResult = null;
+ for (FinishedSnapshotSplitInfo split : sortedSplits) {
+ if (SplitKeyUtils.splitKeyRangeContains(
+ keyArray, split.getSplitStart(), split.getSplitEnd()))
{
+ linearResult = split;
+ break;
+ }
+ }
+
+ if (binaryResult == null) {
+ Assertions.assertThat(linearResult)
+ .as("Key %d should not be in any split", key)
+ .isNull();
+ } else {
+ Assertions.assertThat(linearResult)
+ .as("Key %d should be found by both methods", key)
+ .isNotNull();
+ Assertions.assertThat(binaryResult.getSplitId())
+ .as("Both methods should find the same split for key
%d", key)
+ .isEqualTo(linearResult.getSplitId());
+ }
+ }
+ }
+
+ private FinishedSnapshotSplitInfo createSplit(
+ TableId tableId, String splitId, Object[] splitStart, Object[]
splitEnd) {
+ return new FinishedSnapshotSplitInfo(
+ tableId, splitId, splitStart, splitEnd,
BinlogOffset.ofEarliest());
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index c475ada56..fda792937 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -26,6 +26,7 @@ import
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils;
+import org.apache.flink.cdc.connectors.base.utils.SplitKeyUtils;
import org.apache.flink.cdc.connectors.oracle.source.config.OracleSourceConfig;
import
org.apache.flink.cdc.connectors.oracle.source.handler.OracleSchemaChangeEventHandler;
import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
@@ -221,15 +222,19 @@ public class OracleSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
LOG.error("{} can not convert to RowId", record);
}
Object[] rowIds = new ROWID[] {rowId};
- return SourceRecordUtils.splitKeyRangeContains(rowIds, splitStart,
splitEnd);
+ return SplitKeyUtils.splitKeyRangeContains(rowIds, splitStart,
splitEnd);
} else {
// config chunk key column compare
- Object[] key =
- SourceRecordUtils.getSplitKey(splitKeyType, record,
getSchemaNameAdjuster());
- return SourceRecordUtils.splitKeyRangeContains(key, splitStart,
splitEnd);
+ Object[] key = SplitKeyUtils.getSplitKey(splitKeyType, record,
getSchemaNameAdjuster());
+ return SplitKeyUtils.splitKeyRangeContains(key, splitStart,
splitEnd);
}
}
+ @Override
+ public boolean supportsSplitKeyOptimization() {
+ return false;
+ }
+
@Override
public JdbcSourceEventDispatcher<OraclePartition> getEventDispatcher() {
return dispatcher;