This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 6b8c5f3cf [FLINK-37774][sqlserver] Fix SQL Server CDC connector incorrectly handles special characters in database and table names (#4014) 6b8c5f3cf is described below commit 6b8c5f3cf480ec0f5de1635c80c8e3f2634cdb1b Author: Sergei Morozov <moro...@tut.by> AuthorDate: Mon May 26 07:10:27 2025 +0300 [FLINK-37774][sqlserver] Fix SQL Server CDC connector incorrectly handles special characters in database and table names (#4014) --- .../state/PendingSplitsStateSerializer.java | 2 +- .../state/PendingSplitsStateSerializerTest.java | 30 +++++++++++++++++++++- .../state/PendingSplitsStateSerializer.java | 4 +-- .../mysql/source/split/MySqlSplitSerializer.java | 4 +-- .../state/PendingSplitsStateSerializerTest.java | 4 +-- .../source/utils/SqlServerConnectionUtils.java | 2 +- .../sqlserver/source/utils/SqlServerUtils.java | 10 +++++--- 7 files changed, 44 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java index 6d33fd680..c823c74f2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java @@ -187,7 +187,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P out.writeBoolean(hasTableIsSplitting); if (hasTableIsSplitting) { ChunkSplitterState chunkSplitterState = state.getChunkSplitterState(); - out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString()); + out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString()); out.writeUTF( SerializerUtils.rowToSerializedString( new Object[] {chunkSplitterState.getNextChunkStart().getValue()})); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java index 016c7386a..9ebcc3693 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.base.source.assigner.state; +import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; @@ -83,6 +84,29 @@ class PendingSplitsStateSerializerTest { assertThat(ser1).isEqualTo(ser2); } + @Test + void testSerializeSnapshotPendingSplitsState() throws Exception { + PendingSplitsStateSerializer serializer = + new PendingSplitsStateSerializer(constructSourceSplitSerializer()); + PendingSplitsState state = + new SnapshotPendingSplitsState( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyMap(), + constructTableSchema(), + Collections.emptyMap(), + AssignerStatus.INITIAL_ASSIGNING, + Collections.emptyList(), + false, + true, + Collections.emptyMap(), + new ChunkSplitterState( + constructTableId(), ChunkSplitterState.ChunkBound.middleOf(1), 2)); + + assertThat(serializer.deserialize(serializer.getVersion(), serializer.serialize(state))) + .isEqualTo(state); + } + private SourceSplitSerializer constructSourceSplitSerializer() { return new SourceSplitSerializer() { @Override @@ -148,7 +172,7 @@ class PendingSplitsStateSerializerTest { } private HashMap<TableId, TableChanges.TableChange> constructTableSchema() { - TableId tableId = new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"); + TableId tableId = constructTableId(); HashMap<TableId, TableChanges.TableChange> tableSchema = new HashMap<>(); Tables tables = new Tables(); Table table = tables.editOrCreateTable(tableId).create(); @@ -158,6 +182,10 @@ class PendingSplitsStateSerializerTest { return tableSchema; } + private TableId constructTableId() { + return new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"); + } + /** An implementation for {@link PendingSplitsState} which will cause a serialization error. */ static class UnsupportedPendingSplitsState extends PendingSplitsState {} } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java index ca5f377d0..bb3e2d2ed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java @@ -169,7 +169,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P out.writeBoolean(hasTableIsSplitting); if (hasTableIsSplitting) { ChunkSplitterState chunkSplitterState = state.getChunkSplitterState(); - out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString()); + out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString()); out.writeUTF( SerializerUtils.rowToSerializedString( new Object[] {chunkSplitterState.getNextChunkStart().getValue()})); @@ -415,7 +415,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P final int size = tableIds.size(); out.writeInt(size); for (TableId tableId : tableIds) { - out.writeUTF(tableId.toString()); + out.writeUTF(tableId.toDoubleQuotedString()); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java index 75f485ae6..6609d26bc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java @@ -192,7 +192,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS final int size = tableSchemas.size(); out.writeInt(size); for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) { - out.writeUTF(entry.getKey().toString()); + out.writeUTF(entry.getKey().toDoubleQuotedString()); final String tableChangeStr = documentWriter.write(jsonSerializer.toDocument(entry.getValue())); final byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8); @@ -237,7 +237,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS final int size = finishedSplitsInfo.size(); out.writeInt(size); for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) { - out.writeUTF(splitInfo.getTableId().toString()); + out.writeUTF(splitInfo.getTableId().toDoubleQuotedString()); out.writeUTF(splitInfo.getSplitId()); out.writeUTF(rowToSerializedString(splitInfo.getSplitStart())); out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd())); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java index 1d3e46b7d..3d813b8a8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java @@ -52,8 +52,8 @@ import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSp class PendingSplitsStateSerializerTest { private static final TableId tableId0 = TableId.parse("test_db.test_table"); - private static final TableId tableId1 = TableId.parse("test_db.test_table1"); - private static final TableId tableId2 = TableId.parse("test_db.test_table2"); + private static final TableId tableId1 = TableId.parse("test_db.\"test_table 1\""); + private static final TableId tableId2 = TableId.parse("test_db.\"test_table 2\""); public static Stream<Arguments> params() { return Stream.of( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java index 44add4d68..400318a41 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java @@ -92,7 +92,7 @@ public class SqlServerConnectionUtils { try { jdbc.query( "SELECT * FROM " - + dbName + + SqlServerUtils.quote(dbName) + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE';", rs -> { while (rs.next()) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java index e389a5128..1f1371659 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java @@ -361,12 +361,16 @@ public class SqlServerUtils { return quoted.toString(); } - public static String quote(String dbOrTableName) { - return "[" + dbOrTableName + "]"; + /** + * @link <a + * href="https://learn.microsoft.com/en-us/sql/t-sql/functions/quotename-transact-sql">QUOTENAME</a> + */ + public static String quote(String name) { + return "[" + name + "]"; } public static String quote(TableId tableId) { - return "[" + tableId.schema() + "].[" + tableId.table() + "]"; + return String.format("%s.%s", quote(tableId.schema()), quote(tableId.table())); } private static void addPrimaryKeyColumnsToCondition(