This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b8c5f3cf [FLINK-37774][sqlserver] Fix SQL Server CDC connector 
incorrectly handles special characters in database and table names (#4014)
6b8c5f3cf is described below

commit 6b8c5f3cf480ec0f5de1635c80c8e3f2634cdb1b
Author: Sergei Morozov <moro...@tut.by>
AuthorDate: Mon May 26 07:10:27 2025 +0300

    [FLINK-37774][sqlserver] Fix SQL Server CDC connector incorrectly handles 
special characters in database and table names (#4014)
---
 .../state/PendingSplitsStateSerializer.java        |  2 +-
 .../state/PendingSplitsStateSerializerTest.java    | 30 +++++++++++++++++++++-
 .../state/PendingSplitsStateSerializer.java        |  4 +--
 .../mysql/source/split/MySqlSplitSerializer.java   |  4 +--
 .../state/PendingSplitsStateSerializerTest.java    |  4 +--
 .../source/utils/SqlServerConnectionUtils.java     |  2 +-
 .../sqlserver/source/utils/SqlServerUtils.java     | 10 +++++---
 7 files changed, 44 insertions(+), 12 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
index 6d33fd680..c823c74f2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
@@ -187,7 +187,7 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
         out.writeBoolean(hasTableIsSplitting);
         if (hasTableIsSplitting) {
             ChunkSplitterState chunkSplitterState = 
state.getChunkSplitterState();
-            
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString());
+            
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString());
             out.writeUTF(
                     SerializerUtils.rowToSerializedString(
                             new Object[] 
{chunkSplitterState.getNextChunkStart().getValue()}));
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
index 016c7386a..9ebcc3693 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.cdc.connectors.base.source.assigner.state;
 
+import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
@@ -83,6 +84,29 @@ class PendingSplitsStateSerializerTest {
         assertThat(ser1).isEqualTo(ser2);
     }
 
+    @Test
+    void testSerializeSnapshotPendingSplitsState() throws Exception {
+        PendingSplitsStateSerializer serializer =
+                new 
PendingSplitsStateSerializer(constructSourceSplitSerializer());
+        PendingSplitsState state =
+                new SnapshotPendingSplitsState(
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        constructTableSchema(),
+                        Collections.emptyMap(),
+                        AssignerStatus.INITIAL_ASSIGNING,
+                        Collections.emptyList(),
+                        false,
+                        true,
+                        Collections.emptyMap(),
+                        new ChunkSplitterState(
+                                constructTableId(), 
ChunkSplitterState.ChunkBound.middleOf(1), 2));
+
+        assertThat(serializer.deserialize(serializer.getVersion(), 
serializer.serialize(state)))
+                .isEqualTo(state);
+    }
+
     private SourceSplitSerializer constructSourceSplitSerializer() {
         return new SourceSplitSerializer() {
             @Override
@@ -148,7 +172,7 @@ class PendingSplitsStateSerializerTest {
     }
 
     private HashMap<TableId, TableChanges.TableChange> constructTableSchema() {
-        TableId tableId = new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`");
+        TableId tableId = constructTableId();
         HashMap<TableId, TableChanges.TableChange> tableSchema = new 
HashMap<>();
         Tables tables = new Tables();
         Table table = tables.editOrCreateTable(tableId).create();
@@ -158,6 +182,10 @@ class PendingSplitsStateSerializerTest {
         return tableSchema;
     }
 
+    private TableId constructTableId() {
+        return new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`");
+    }
+
     /** An implementation for {@link PendingSplitsState} which will cause a 
serialization error. */
     static class UnsupportedPendingSplitsState extends PendingSplitsState {}
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index ca5f377d0..bb3e2d2ed 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -169,7 +169,7 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
         out.writeBoolean(hasTableIsSplitting);
         if (hasTableIsSplitting) {
             ChunkSplitterState chunkSplitterState = 
state.getChunkSplitterState();
-            
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString());
+            
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString());
             out.writeUTF(
                     SerializerUtils.rowToSerializedString(
                             new Object[] 
{chunkSplitterState.getNextChunkStart().getValue()}));
@@ -415,7 +415,7 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
         final int size = tableIds.size();
         out.writeInt(size);
         for (TableId tableId : tableIds) {
-            out.writeUTF(tableId.toString());
+            out.writeUTF(tableId.toDoubleQuotedString());
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
index 75f485ae6..6609d26bc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
@@ -192,7 +192,7 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
         final int size = tableSchemas.size();
         out.writeInt(size);
         for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
-            out.writeUTF(entry.getKey().toString());
+            out.writeUTF(entry.getKey().toDoubleQuotedString());
             final String tableChangeStr =
                     
documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
             final byte[] tableChangeBytes = 
tableChangeStr.getBytes(StandardCharsets.UTF_8);
@@ -237,7 +237,7 @@ public final class MySqlSplitSerializer implements 
SimpleVersionedSerializer<MyS
         final int size = finishedSplitsInfo.size();
         out.writeInt(size);
         for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
-            out.writeUTF(splitInfo.getTableId().toString());
+            out.writeUTF(splitInfo.getTableId().toDoubleQuotedString());
             out.writeUTF(splitInfo.getSplitId());
             out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
             out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
index 1d3e46b7d..3d813b8a8 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
@@ -52,8 +52,8 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSp
 class PendingSplitsStateSerializerTest {
 
     private static final TableId tableId0 = 
TableId.parse("test_db.test_table");
-    private static final TableId tableId1 = 
TableId.parse("test_db.test_table1");
-    private static final TableId tableId2 = 
TableId.parse("test_db.test_table2");
+    private static final TableId tableId1 = 
TableId.parse("test_db.\"test_table 1\"");
+    private static final TableId tableId2 = 
TableId.parse("test_db.\"test_table 2\"");
 
     public static Stream<Arguments> params() {
         return Stream.of(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
index 44add4d68..400318a41 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
@@ -92,7 +92,7 @@ public class SqlServerConnectionUtils {
             try {
                 jdbc.query(
                         "SELECT * FROM "
-                                + dbName
+                                + SqlServerUtils.quote(dbName)
                                 + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE 
= 'BASE TABLE';",
                         rs -> {
                             while (rs.next()) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index e389a5128..1f1371659 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -361,12 +361,16 @@ public class SqlServerUtils {
         return quoted.toString();
     }
 
-    public static String quote(String dbOrTableName) {
-        return "[" + dbOrTableName + "]";
+    /**
+     * @link <a
+     *     
href="https://learn.microsoft.com/en-us/sql/t-sql/functions/quotename-transact-sql";>QUOTENAME</a>
+     */
+    public static String quote(String name) {
+        return "[" + name + "]";
     }
 
     public static String quote(TableId tableId) {
-        return "[" + tableId.schema() + "].[" + tableId.table() + "]";
+        return String.format("%s.%s", quote(tableId.schema()), 
quote(tableId.table()));
     }
 
     private static void addPrimaryKeyColumnsToCondition(

Reply via email to