This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 6b8c5f3cf [FLINK-37774][sqlserver] Fix SQL Server CDC connector
incorrectly handles special characters in database and table names (#4014)
6b8c5f3cf is described below
commit 6b8c5f3cf480ec0f5de1635c80c8e3f2634cdb1b
Author: Sergei Morozov <[email protected]>
AuthorDate: Mon May 26 07:10:27 2025 +0300
[FLINK-37774][sqlserver] Fix SQL Server CDC connector incorrectly handles
special characters in database and table names (#4014)
---
.../state/PendingSplitsStateSerializer.java | 2 +-
.../state/PendingSplitsStateSerializerTest.java | 30 +++++++++++++++++++++-
.../state/PendingSplitsStateSerializer.java | 4 +--
.../mysql/source/split/MySqlSplitSerializer.java | 4 +--
.../state/PendingSplitsStateSerializerTest.java | 4 +--
.../source/utils/SqlServerConnectionUtils.java | 2 +-
.../sqlserver/source/utils/SqlServerUtils.java | 10 +++++---
7 files changed, 44 insertions(+), 12 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
index 6d33fd680..c823c74f2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java
@@ -187,7 +187,7 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
out.writeBoolean(hasTableIsSplitting);
if (hasTableIsSplitting) {
ChunkSplitterState chunkSplitterState =
state.getChunkSplitterState();
-
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString());
+
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString());
out.writeUTF(
SerializerUtils.rowToSerializedString(
new Object[]
{chunkSplitterState.getNextChunkStart().getValue()}));
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
index 016c7386a..9ebcc3693 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.base.source.assigner.state;
+import org.apache.flink.cdc.connectors.base.source.assigner.AssignerStatus;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
@@ -83,6 +84,29 @@ class PendingSplitsStateSerializerTest {
assertThat(ser1).isEqualTo(ser2);
}
+ @Test
+ void testSerializeSnapshotPendingSplitsState() throws Exception {
+ PendingSplitsStateSerializer serializer =
+ new
PendingSplitsStateSerializer(constructSourceSplitSerializer());
+ PendingSplitsState state =
+ new SnapshotPendingSplitsState(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ constructTableSchema(),
+ Collections.emptyMap(),
+ AssignerStatus.INITIAL_ASSIGNING,
+ Collections.emptyList(),
+ false,
+ true,
+ Collections.emptyMap(),
+ new ChunkSplitterState(
+ constructTableId(),
ChunkSplitterState.ChunkBound.middleOf(1), 2));
+
+ assertThat(serializer.deserialize(serializer.getVersion(),
serializer.serialize(state)))
+ .isEqualTo(state);
+ }
+
private SourceSplitSerializer constructSourceSplitSerializer() {
return new SourceSplitSerializer() {
@Override
@@ -148,7 +172,7 @@ class PendingSplitsStateSerializerTest {
}
private HashMap<TableId, TableChanges.TableChange> constructTableSchema() {
- TableId tableId = new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`");
+ TableId tableId = constructTableId();
HashMap<TableId, TableChanges.TableChange> tableSchema = new
HashMap<>();
Tables tables = new Tables();
Table table = tables.editOrCreateTable(tableId).create();
@@ -158,6 +182,10 @@ class PendingSplitsStateSerializerTest {
return tableSchema;
}
+ private TableId constructTableId() {
+ return new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`");
+ }
+
/** An implementation for {@link PendingSplitsState} which will cause a
serialization error. */
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index ca5f377d0..bb3e2d2ed 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -169,7 +169,7 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
out.writeBoolean(hasTableIsSplitting);
if (hasTableIsSplitting) {
ChunkSplitterState chunkSplitterState =
state.getChunkSplitterState();
-
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toString());
+
out.writeUTF(chunkSplitterState.getCurrentSplittingTableId().toDoubleQuotedString());
out.writeUTF(
SerializerUtils.rowToSerializedString(
new Object[]
{chunkSplitterState.getNextChunkStart().getValue()}));
@@ -415,7 +415,7 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
final int size = tableIds.size();
out.writeInt(size);
for (TableId tableId : tableIds) {
- out.writeUTF(tableId.toString());
+ out.writeUTF(tableId.toDoubleQuotedString());
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
index 75f485ae6..6609d26bc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java
@@ -192,7 +192,7 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
final int size = tableSchemas.size();
out.writeInt(size);
for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
- out.writeUTF(entry.getKey().toString());
+ out.writeUTF(entry.getKey().toDoubleQuotedString());
final String tableChangeStr =
documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
final byte[] tableChangeBytes =
tableChangeStr.getBytes(StandardCharsets.UTF_8);
@@ -237,7 +237,7 @@ public final class MySqlSplitSerializer implements
SimpleVersionedSerializer<MyS
final int size = finishedSplitsInfo.size();
out.writeInt(size);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo) {
- out.writeUTF(splitInfo.getTableId().toString());
+ out.writeUTF(splitInfo.getTableId().toDoubleQuotedString());
out.writeUTF(splitInfo.getSplitId());
out.writeUTF(rowToSerializedString(splitInfo.getSplitStart()));
out.writeUTF(rowToSerializedString(splitInfo.getSplitEnd()));
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
index 1d3e46b7d..3d813b8a8 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
@@ -52,8 +52,8 @@ import static
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSp
class PendingSplitsStateSerializerTest {
private static final TableId tableId0 =
TableId.parse("test_db.test_table");
- private static final TableId tableId1 =
TableId.parse("test_db.test_table1");
- private static final TableId tableId2 =
TableId.parse("test_db.test_table2");
+ private static final TableId tableId1 =
TableId.parse("test_db.\"test_table 1\"");
+ private static final TableId tableId2 =
TableId.parse("test_db.\"test_table 2\"");
public static Stream<Arguments> params() {
return Stream.of(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
index 44add4d68..400318a41 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerConnectionUtils.java
@@ -92,7 +92,7 @@ public class SqlServerConnectionUtils {
try {
jdbc.query(
"SELECT * FROM "
- + dbName
+ + SqlServerUtils.quote(dbName)
+ ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE
= 'BASE TABLE';",
rs -> {
while (rs.next()) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
index e389a5128..1f1371659 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java
@@ -361,12 +361,16 @@ public class SqlServerUtils {
return quoted.toString();
}
- public static String quote(String dbOrTableName) {
- return "[" + dbOrTableName + "]";
+ /**
+ * @link <a
+ *
href="https://learn.microsoft.com/en-us/sql/t-sql/functions/quotename-transact-sql">QUOTENAME</a>
+ */
+ public static String quote(String name) {
+ return "[" + name + "]";
}
public static String quote(TableId tableId) {
- return "[" + tableId.schema() + "].[" + tableId.table() + "]";
+ return String.format("%s.%s", quote(tableId.schema()),
quote(tableId.table()));
}
private static void addPrimaryKeyColumnsToCondition(