This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 48f363615f [INLONG-8125][Sort] Optimizing the speed of transitioning
from snapshot to binlog (#8131)
48f363615f is described below
commit 48f363615f467536a25f684d46b2037dcdb8288c
Author: emhui <[email protected]>
AuthorDate: Tue Jun 6 15:51:02 2023 +0800
[INLONG-8125][Sort] Optimizing the speed of transitioning from snapshot to
binlog (#8131)
---
.../mysql/debezium/reader/BinlogSplitReader.java | 7 ++++
.../cdc/mysql/source/assigners/ChunkSplitter.java | 5 +--
.../cdc/mysql/source/reader/MySqlSourceReader.java | 5 +++
.../sort/cdc/mysql/source/utils/ChunkUtils.java | 16 +++++++++
.../sort/cdc/mysql/source/utils/RecordUtils.java | 40 ++++++++++++++++++++++
5 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index 5bd457a794..eee97a0fcb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -52,6 +52,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
+import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitInfoByBinarySearch;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitKey;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isDataChangeRecord;
@@ -204,6 +205,12 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecord, MySqlSpli
splitKeyType,
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster());
+ // currently, we only support using binary search algorithm
for a single split key.
+ if (key.length == 1) {
+ FinishedSnapshotSplitInfo splitInfo =
getSplitInfoByBinarySearch(
+ finishedSplitsInfo.get(tableId), key);
+ return splitInfo != null &&
position.isAfter(splitInfo.getHighWatermark());
+ }
for (FinishedSnapshotSplitInfo splitInfo :
finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(
key, splitInfo.getSplitStart(),
splitInfo.getSplitEnd())
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
index 88505a75cb..cd7c4b2ad0 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
@@ -48,6 +48,7 @@ import java.util.Objects;
import static java.math.BigDecimal.ROUND_CEILING;
import static
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.splitId;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.ObjectUtils.doubleCompare;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.StatementUtils.queryApproximateRowCnt;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.StatementUtils.queryMin;
@@ -88,10 +89,6 @@ class ChunkSplitter {
// Utilities
//
--------------------------------------------------------------------------------------------
- private static String splitId(TableId tableId, int chunkId) {
- return tableId.toString() + ":" + chunkId;
- }
-
private static void maySleep(int count, TableId tableId) {
// every 100 queries to sleep 1s
if (count % 10 == 0) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 58b5d04613..91a3c96841 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -62,6 +62,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
import static
org.apache.inlong.sort.cdc.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
import static
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
import static
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.chunkId;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
/**
@@ -228,6 +230,8 @@ public class MySqlSourceReader<T>
uncompletedBinlogSplits.remove(split.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
+ mySqlBinlogSplit.getFinishedSnapshotSplitInfos()
+ .sort(Comparator.comparingInt(splitInfo ->
chunkId(splitInfo.getSplitId())));
unfinishedSplits.add(mySqlBinlogSplit);
}
}
@@ -390,4 +394,5 @@ public class MySqlSourceReader<T>
protected MySqlSplit toSplitType(String splitId, MySqlSplitState
splitState) {
return splitState.toMySqlSplit();
}
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
index f479ac4b65..6fe222f9cf 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.cdc.mysql.schema.MySqlTypeUtils;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -80,4 +81,19 @@ public class ChunkUtils {
? (receivedMetaNum / metaGroupSize)
: (receivedMetaNum / metaGroupSize) + 1;
}
+
+ /**
+ * return split id
+ */
+ public static String splitId(TableId tableId, int chunkId) {
+ return tableId.toString() + ":" + chunkId;
+ }
+
+ /**
+ * get chunk id from split id
+ */
+ public static int chunkId(String splitId) {
+ return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") +
1));
+ }
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index 025a0b50c0..eaddb356d8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.cdc.mysql.source.utils;
import
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
+import org.apache.inlong.sort.cdc.mysql.debezium.reader.DebeziumReader;
import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
@@ -448,6 +449,45 @@ public class RecordUtils {
}
}
+ /**
+ * Returns a split which contain the specific key.
+ *
+ * @param splitInfos split info list.
+ * @param target the specific key.
+ * @return A split which contain the specific key.
+ */
+ public static FinishedSnapshotSplitInfo getSplitInfoByBinarySearch(
+ List<FinishedSnapshotSplitInfo> splitInfos, Object[] target) {
+ int left = 0;
+ int right = splitInfos.size() - 1;
+
+ while (left <= right) {
+ int mid = left + (right - left) / 2;
+ FinishedSnapshotSplitInfo splitInfo = splitInfos.get(mid);
+
+ Object[] splitStart = splitInfo.getSplitStart();
+ Object[] splitEnd = splitInfo.getSplitEnd();
+
+ if (splitKeyRangeContains(target, splitStart, splitEnd)) {
+ return splitInfo;
+ } else if (splitStart != null && isLessThanUpperBoundary(target,
splitStart)) {
+ right = mid - 1;
+ } else {
+ left = mid + 1;
+ }
+ }
+
+ return null;
+ }
+
+ public static boolean isLessThanUpperBoundary(Object[] key, Object[]
upperBoundary) {
+ int[] upperBoundRes = new int[key.length];
+ for (int i = 0; i < key.length; i++) {
+ upperBoundRes[i] = compareObjects(key[i], upperBoundary[i]);
+ }
+ return Arrays.stream(upperBoundRes).allMatch(value -> value < 0);
+ }
+
private static int compareObjects(Object o1, Object o2) {
if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
return ((Comparable) o1).compareTo(o2);