This is an automated email from the ASF dual-hosted git repository.

gongzhongqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new f6d1d4810 [FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk 
UUID-typed columns correctly (#3497)
f6d1d4810 is described below

commit f6d1d4810af34354da262402c3871c7db3d1a414
Author: lipl <[email protected]>
AuthorDate: Tue Aug 6 17:43:09 2024 +0800

    [FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk UUID-typed columns 
correctly (#3497)
    
    * resolve conficts
    
    * polish code to trigger ci
    
    ---------
    
    Co-authored-by: Kael <[email protected]>
    Co-authored-by: gongzhongqiang <[email protected]>
---
 .../assigner/splitter/JdbcSourceChunkSplitter.java |  8 +--
 .../assigner/splitter/OracleChunkSplitter.java     |  4 +-
 .../source/dialect/SqlServerChunkSplitter.java     |  9 +++
 .../sqlserver/source/utils/SqlServerTypeUtils.java |  3 +
 .../sqlserver/source/utils/SqlServerUtils.java     | 66 +++++++++++++++----
 .../source/utils/SQLServerUUIDComparatorTest.java  | 75 ++++++++++++++++++++++
 6 files changed, 147 insertions(+), 18 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index 1d50164cc..508e8cb9b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -214,12 +214,12 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
     }
 
     /** ChunkEnd less than or equal to max. */
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
         return ObjectUtils.compare(chunkEnd, max) <= 0;
     }
 
     /** ChunkEnd greater than or equal to max. */
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
         return ObjectUtils.compare(chunkEnd, max) >= 0;
     }
 
@@ -368,7 +368,7 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
         Object chunkStart = null;
         Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
         int count = 0;
-        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
+        while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, 
splitColumn)) {
             // we start from [null, min + chunk_size) and avoid [null, min)
             splits.add(ChunkRange.of(chunkStart, chunkEnd));
             // may sleep a while to avoid DDOS on PostgreSQL server
@@ -397,7 +397,7 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
             // should query the next one larger than chunkEnd
             chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
         }
-        if (isChunkEndGeMax(chunkEnd, max)) {
+        if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
             return null;
         } else {
             return chunkEnd;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
index adf4c98a9..ffcdb4bb2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/assigner/splitter/OracleChunkSplitter.java
@@ -102,7 +102,7 @@ public class OracleChunkSplitter extends 
JdbcSourceChunkSplitter {
 
     /** ChunkEnd less than or equal to max. */
     @Override
-    protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
             chunkEndMaxCompare =
@@ -116,7 +116,7 @@ public class OracleChunkSplitter extends 
JdbcSourceChunkSplitter {
 
     /** ChunkEnd greater than or equal to max. */
     @Override
-    protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
         boolean chunkEndMaxCompare;
         if (chunkEnd instanceof ROWID && max instanceof ROWID) {
             chunkEndMaxCompare =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
index fb338a0ea..e79d0bcfa 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java
@@ -64,4 +64,13 @@ public class SqlServerChunkSplitter extends 
JdbcSourceChunkSplitter {
             throws SQLException {
         return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
     }
+
+    protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+        return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
+    }
+
+    /** ChunkEnd greater than or equal to max. */
+    protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column 
splitColumn) {
+        return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
index 3d7163189..8f1853837 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -27,6 +27,9 @@ import java.sql.Types;
 /** Utilities for converting from SqlServer types to Flink types. */
 public class SqlServerTypeUtils {
 
+    /** Microsoft SQL type GUID's type name. */
+    static final String UNIQUEIDENTIFIRER = "uniqueidentifier";
+
     /** Returns a corresponding Flink data type from a debezium {@link 
Column}. */
     public static DataType fromDbzColumn(Column column) {
         DataType dataType = convertFromColumn(column);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index b2292a825..e389a5128 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.connectors.sqlserver.source.utils;
 
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
 import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.RowType;
@@ -40,14 +41,17 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import javax.annotation.Nullable;
 
+import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.DataTypes.FIELD;
@@ -297,8 +301,7 @@ public class SqlServerUtils {
             return buildSelectWithRowLimits(
                     tableId, limitSize, "*", Optional.ofNullable(condition), 
Optional.empty());
         } else {
-            final String orderBy =
-                    
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
+            final String orderBy = String.join(", ", 
pkRowType.getFieldNames());
             return buildSelectWithBoundaryRowLimits(
                     tableId,
                     limitSize,
@@ -322,7 +325,7 @@ public class SqlServerUtils {
         StringBuilder sql = new StringBuilder();
         for (Iterator<String> fieldNamesIt = 
pkRowType.getFieldNames().iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append("MAX(" + fieldNamesIt.next() + ")");
+            sql.append("MAX(").append(fieldNamesIt.next()).append(")");
             if (fieldNamesIt.hasNext()) {
                 sql.append(" , ");
             }
@@ -342,12 +345,8 @@ public class SqlServerUtils {
         }
         sql.append(projection).append(" FROM ");
         sql.append(quoteSchemaAndTable(tableId));
-        if (condition.isPresent()) {
-            sql.append(" WHERE ").append(condition.get());
-        }
-        if (orderBy.isPresent()) {
-            sql.append(" ORDER BY ").append(orderBy.get());
-        }
+        condition.ifPresent(s -> sql.append(" WHERE ").append(s));
+        orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s));
         return sql.toString();
     }
 
@@ -396,11 +395,54 @@ public class SqlServerUtils {
         sql.append(projection);
         sql.append(" FROM ");
         sql.append(quoteSchemaAndTable(tableId));
-        if (condition.isPresent()) {
-            sql.append(" WHERE ").append(condition.get());
-        }
+        condition.ifPresent(s -> sql.append(" WHERE ").append(s));
         sql.append(" ORDER BY ").append(orderBy);
         sql.append(") T");
         return sql.toString();
     }
+
+    public static int compare(Object obj1, Object obj2, Column splitColumn) {
+        if 
(splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) {
+            return new SQLServerUUIDComparator()
+                    .compare(UUID.fromString(obj1.toString()), 
UUID.fromString(obj2.toString()));
+        }
+        return ObjectUtils.compare(obj1, obj2);
+    }
+
+    /**
+     * Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a 
different order than Java.
+     * Reference code: <a
+     * 
href="https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Data.Common/src/System/Data/SQLTypes/SQLGuid.cs#L113";>SQLGuid.cs::CompareTo</a>
+     * Reference doc: <a
+     * 
href="https://learn.microsoft.com/uk-ua/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16";>Comparing
+     * GUID and uniqueidentifier values</a>
+     */
+    static class SQLServerUUIDComparator implements Comparator<UUID> {
+
+        private static final int SIZE_OF_GUID = 16;
+        private static final byte[] GUID_ORDER = {
+            10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3
+        };
+
+        public int compare(UUID uuid1, UUID uuid2) {
+            byte[] bytes1 = uuidToBytes(uuid1);
+            byte[] bytes2 = uuidToBytes(uuid2);
+
+            for (int i = 0; i < SIZE_OF_GUID; i++) {
+                byte b1 = bytes1[GUID_ORDER[i]];
+                byte b2 = bytes2[GUID_ORDER[i]];
+                if (b1 != b2) {
+                    return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte 
comparison
+                }
+            }
+            return 0;
+        }
+
+        private byte[] uuidToBytes(UUID uuid) {
+            ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
+            bb.putLong(uuid.getMostSignificantBits());
+            bb.putLong(uuid.getLeastSignificantBits());
+            return bb.array();
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
new file mode 100644
index 000000000..89a292777
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SQLServerUUIDComparatorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.source.utils;
+
+import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */
+public class SQLServerUUIDComparatorTest {
+
+    @Test
+    public void testComparator() {
+        SqlServerUtils.SQLServerUUIDComparator comparator =
+                new SqlServerUtils.SQLServerUUIDComparator();
+        // Create an ArrayList and fill it with Guid values.
+        List<UUID> guidList = new ArrayList<>();
+        guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
+        guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
+        guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
+
+        // Sort the Guids.
+        guidList.sort(ObjectUtils::compare);
+
+        assertEquals(
+                guidList.get(0).toString().toUpperCase(), 
"1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
+        assertEquals(
+                guidList.get(1).toString().toUpperCase(), 
"2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
+        assertEquals(
+                guidList.get(2).toString().toUpperCase(), 
"3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
+
+        // Create an ArrayList of SqlGuids.
+        List<UUID> sqlGuidList = new ArrayList<>();
+        
sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
+        
sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
+        
sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));
+
+        // Sort the SqlGuids. The unsorted SqlGuids are in the same order
+        // as the unsorted Guid values.
+        sqlGuidList.sort(comparator);
+
+        // Display the sorted SqlGuids. The sorted SqlGuid values are ordered
+        // differently than the Guid values.
+        assertEquals(
+                sqlGuidList.get(0).toString().toUpperCase(),
+                "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
+        assertEquals(
+                sqlGuidList.get(1).toString().toUpperCase(),
+                "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
+        assertEquals(
+                sqlGuidList.get(2).toString().toUpperCase(),
+                "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
+    }
+}

Reply via email to