This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f363615f [INLONG-8125][Sort] Optimizing the speed of transitioning 
from snapshot to binlog (#8131)
48f363615f is described below

commit 48f363615f467536a25f684d46b2037dcdb8288c
Author: emhui <[email protected]>
AuthorDate: Tue Jun 6 15:51:02 2023 +0800

    [INLONG-8125][Sort] Optimizing the speed of transitioning from snapshot to 
binlog (#8131)
---
 .../mysql/debezium/reader/BinlogSplitReader.java   |  7 ++++
 .../cdc/mysql/source/assigners/ChunkSplitter.java  |  5 +--
 .../cdc/mysql/source/reader/MySqlSourceReader.java |  5 +++
 .../sort/cdc/mysql/source/utils/ChunkUtils.java    | 16 +++++++++
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 40 ++++++++++++++++++++++
 5 files changed, 69 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
index 5bd457a794..eee97a0fcb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/debezium/reader/BinlogSplitReader.java
@@ -52,6 +52,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getBinlogPosition;
+import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitInfoByBinarySearch;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getSplitKey;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.getTableId;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isDataChangeRecord;
@@ -204,6 +205,12 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecord, MySqlSpli
                                 splitKeyType,
                                 sourceRecord,
                                 statefulTaskContext.getSchemaNameAdjuster());
+                // currently, we only support using binary search algorithm 
for a single split key.
+                if (key.length == 1) {
+                    FinishedSnapshotSplitInfo splitInfo = 
getSplitInfoByBinarySearch(
+                            finishedSplitsInfo.get(tableId), key);
+                    return splitInfo != null && 
position.isAfter(splitInfo.getHighWatermark());
+                }
                 for (FinishedSnapshotSplitInfo splitInfo : 
finishedSplitsInfo.get(tableId)) {
                     if (RecordUtils.splitKeyRangeContains(
                             key, splitInfo.getSplitStart(), 
splitInfo.getSplitEnd())
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
index 88505a75cb..cd7c4b2ad0 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/ChunkSplitter.java
@@ -48,6 +48,7 @@ import java.util.Objects;
 
 import static java.math.BigDecimal.ROUND_CEILING;
 import static 
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.splitId;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.ObjectUtils.doubleCompare;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.StatementUtils.queryApproximateRowCnt;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.StatementUtils.queryMin;
@@ -88,10 +89,6 @@ class ChunkSplitter {
     // Utilities
     // 
--------------------------------------------------------------------------------------------
 
-    private static String splitId(TableId tableId, int chunkId) {
-        return tableId.toString() + ":" + chunkId;
-    }
-
     private static void maySleep(int count, TableId tableId) {
         // every 100 queries to sleep 1s
         if (count % 10 == 0) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
index 58b5d04613..91a3c96841 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlSourceReader.java
@@ -62,6 +62,7 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.inlong.sort.cdc.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
 import static 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
 import static 
org.apache.inlong.sort.cdc.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
+import static org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.chunkId;
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
 
 /**
@@ -228,6 +230,8 @@ public class MySqlSourceReader<T>
                     uncompletedBinlogSplits.remove(split.splitId());
                     MySqlBinlogSplit mySqlBinlogSplit =
                             
discoverTableSchemasForBinlogSplit(split.asBinlogSplit());
+                    mySqlBinlogSplit.getFinishedSnapshotSplitInfos()
+                            .sort(Comparator.comparingInt(splitInfo -> 
chunkId(splitInfo.getSplitId())));
                     unfinishedSplits.add(mySqlBinlogSplit);
                 }
             }
@@ -390,4 +394,5 @@ public class MySqlSourceReader<T>
     protected MySqlSplit toSplitType(String splitId, MySqlSplitState 
splitState) {
         return splitState.toMySqlSplit();
     }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
index f479ac4b65..6fe222f9cf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/ChunkUtils.java
@@ -21,6 +21,7 @@ import org.apache.inlong.sort.cdc.mysql.schema.MySqlTypeUtils;
 
 import io.debezium.relational.Column;
 import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
@@ -80,4 +81,19 @@ public class ChunkUtils {
                 ? (receivedMetaNum / metaGroupSize)
                 : (receivedMetaNum / metaGroupSize) + 1;
     }
+
+    /**
+     *  return split id
+     */
+    public static String splitId(TableId tableId, int chunkId) {
+        return tableId.toString() + ":" + chunkId;
+    }
+
+    /**
+     * get chunk id from split id
+     */
+    public static int chunkId(String splitId) {
+        return Integer.parseInt(splitId.substring(splitId.lastIndexOf(":") + 
1));
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index 025a0b50c0..eaddb356d8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.cdc.mysql.source.utils;
 
 import 
org.apache.inlong.sort.cdc.mysql.debezium.dispatcher.SignalEventDispatcher;
+import org.apache.inlong.sort.cdc.mysql.debezium.reader.DebeziumReader;
 import org.apache.inlong.sort.cdc.mysql.source.offset.BinlogOffset;
 import org.apache.inlong.sort.cdc.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.inlong.sort.cdc.mysql.source.split.MySqlSnapshotSplit;
@@ -448,6 +449,45 @@ public class RecordUtils {
         }
     }
 
+    /**
+     * Returns a split which contain the specific key.
+     *
+     * @param splitInfos split info list.
+     * @param target the specific key.
+     * @return A split which contain the specific key.
+     */
+    public static FinishedSnapshotSplitInfo getSplitInfoByBinarySearch(
+            List<FinishedSnapshotSplitInfo> splitInfos, Object[] target) {
+        int left = 0;
+        int right = splitInfos.size() - 1;
+
+        while (left <= right) {
+            int mid = left + (right - left) / 2;
+            FinishedSnapshotSplitInfo splitInfo = splitInfos.get(mid);
+
+            Object[] splitStart = splitInfo.getSplitStart();
+            Object[] splitEnd = splitInfo.getSplitEnd();
+
+            if (splitKeyRangeContains(target, splitStart, splitEnd)) {
+                return splitInfo;
+            } else if (splitStart != null && isLessThanUpperBoundary(target, 
splitStart)) {
+                right = mid - 1;
+            } else {
+                left = mid + 1;
+            }
+        }
+
+        return null;
+    }
+
+    public static boolean isLessThanUpperBoundary(Object[] key, Object[] 
upperBoundary) {
+        int[] upperBoundRes = new int[key.length];
+        for (int i = 0; i < key.length; i++) {
+            upperBoundRes[i] = compareObjects(key[i], upperBoundary[i]);
+        }
+        return Arrays.stream(upperBoundRes).allMatch(value -> value < 0);
+    }
+
     private static int compareObjects(Object o1, Object o2) {
         if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
             return ((Comparable) o1).compareTo(o2);

Reply via email to