yoheimuta commented on PR #3876:
URL: https://github.com/apache/flink-cdc/pull/3876#issuecomment-2606494441
Here is the current difference in DataChangeEvent between renaming a single
table and renaming multiple tables:
The first log comes from renaming a single table in a single statement. The
second log is one of the DataChangeEvents generated when multiple tables are
renamed in a single statement, specifically the event relevant to this case.
The comparison focuses on these two logs to highlight how `source.table`
differs between the two scenarios.
<details>
<summary>diff</summary>
```diff
$ diff rename_single.txt rename_multi.txt
6c6
< ts_sec=1737529154,
---
> ts_sec=1737425209,
8,9c8,9
< pos=1054,
< gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46,
---
> pos=1798,
> gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48,
17c17
< databaseName=customer_1ttteik
---
> databaseName=customer_1nkreoe
27,29c27,29
< ts_ms=1737529154419,
< db=customer_1ttteik,
< table=customers,
---
> ts_ms=1737425209617,
> db=customer_1nkreoe,
> table=customers_old,customers,
31c31
< gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47,
---
> gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49,
33c33
< pos=898,
---
> pos=1580,
39c39
< "pos": 898,
---
> "pos": 1580,
44c44
< "ts_sec": 1737529154,
---
> "ts_sec": 1737425209,
46,47c46,47
< "pos": 1054,
< "gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46",
---
> "pos": 1798,
> "gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48",
50,51c50,51
< "databaseName": "customer_1ttteik",
< "ddl": "RENAME TABLE customer_1ttteik.customers_copy TO
customer_1ttteik.customers",
---
> "databaseName": "customer_1nkreoe",
> "ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO
customer_1nkreoe.customers",
55,56c55,56
< "id": "\"customer_1ttteik\".\"customers\"",
< "previousId":
"\"customer_1ttteik\".\"customers_copy\"",
---
> "id": "\"customer_1nkreoe\".\"customers\"",
> "previousId":
"\"customer_1nkreoe\".\"customers_copy\"",
```
```
$ cat rename_single.txt
DataChangeEvent [
record=SourceRecord {
sourcePartition={server=mysql_binlog_source},
sourceOffset={
transaction_id=null,
ts_sec=1737529154,
file=mysql-bin.000004,
pos=1054,
gtids=5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46,
server_id=223344
}
},
ConnectRecord {
topic='mysql_binlog_source',
kafkaPartition=0,
key=Struct {
databaseName=customer_1ttteik
},
keySchema=Schema {
io.debezium.connector.mysql.SchemaChangeKey:STRUCT
},
value=Struct {
source=Struct {
version=1.9.8.Final,
connector=mysql,
name=mysql_binlog_source,
ts_ms=1737529154419,
db=customer_1ttteik,
table=customers,
server_id=223344,
gtid=5cb040cc-d88e-11ef-aaf7-0242ac110004:47,
file=mysql-bin.000004,
pos=898,
row=0
},
historyRecord={
"source": {
"file": "mysql-bin.000004",
"pos": 898,
"server_id": 223344
},
"position": {
"transaction_id": null,
"ts_sec": 1737529154,
"file": "mysql-bin.000004",
"pos": 1054,
"gtids": "5cb040cc-d88e-11ef-aaf7-0242ac110004:1-46",
"server_id": 223344
},
"databaseName": "customer_1ttteik",
"ddl": "RENAME TABLE customer_1ttteik.customers_copy TO
customer_1ttteik.customers",
"tableChanges": [
{
"type": "ALTER",
"id": "\"customer_1ttteik\".\"customers\"",
"previousId":
"\"customer_1ttteik\".\"customers_copy\"",
"table": {
"defaultCharsetName": "latin1",
"primaryKeyColumnNames": ["id"],
"columns": [
{
"name": "id",
"jdbcType": 4,
"typeName": "INT",
"typeExpression": "INT",
"charsetName": null,
"length": 11,
"position": 1,
"optional": false,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": false,
"enumValues": []
},
{
"name": "name",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "latin1",
"length": 255,
"position": 2,
"optional": false,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": true,
"defaultValueExpression": "flink",
"enumValues": []
},
{
"name": "address",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "latin1",
"length": 1024,
"position": 3,
"optional": true,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": true,
"enumValues": []
}
]
},
"comment": null
}
]
}
},
valueSchema=Schema {
io.debezium.connector.mysql.SchemaChangeValue:STRUCT
},
timestamp=null,
headers=ConnectHeaders(headers=)
}
]
```
```
$ cat rename_multi.txt
DataChangeEvent [
record=SourceRecord {
sourcePartition={server=mysql_binlog_source},
sourceOffset={
transaction_id=null,
ts_sec=1737425209,
file=mysql-bin.000004,
pos=1798,
gtids=5891eef6-d79c-11ef-b2d3-0242ac110004:44-48,
server_id=223344
}
},
ConnectRecord {
topic='mysql_binlog_source',
kafkaPartition=0,
key=Struct {
databaseName=customer_1nkreoe
},
keySchema=Schema {
io.debezium.connector.mysql.SchemaChangeKey:STRUCT
},
value=Struct {
source=Struct {
version=1.9.8.Final,
connector=mysql,
name=mysql_binlog_source,
ts_ms=1737425209617,
db=customer_1nkreoe,
table=customers_old,customers,
server_id=223344,
gtid=5891eef6-d79c-11ef-b2d3-0242ac110004:49,
file=mysql-bin.000004,
pos=1580,
row=0
},
historyRecord={
"source": {
"file": "mysql-bin.000004",
"pos": 1580,
"server_id": 223344
},
"position": {
"transaction_id": null,
"ts_sec": 1737425209,
"file": "mysql-bin.000004",
"pos": 1798,
"gtids": "5891eef6-d79c-11ef-b2d3-0242ac110004:44-48",
"server_id": 223344
},
"databaseName": "customer_1nkreoe",
"ddl": "RENAME TABLE customer_1nkreoe.customers_copy TO
customer_1nkreoe.customers",
"tableChanges": [
{
"type": "ALTER",
"id": "\"customer_1nkreoe\".\"customers\"",
"previousId":
"\"customer_1nkreoe\".\"customers_copy\"",
"table": {
"defaultCharsetName": "latin1",
"primaryKeyColumnNames": ["id"],
"columns": [
{
"name": "id",
"jdbcType": 4,
"typeName": "INT",
"typeExpression": "INT",
"charsetName": null,
"length": 11,
"position": 1,
"optional": false,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": false,
"enumValues": []
},
{
"name": "name",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "latin1",
"length": 255,
"position": 2,
"optional": false,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": true,
"defaultValueExpression": "flink",
"enumValues": []
},
{
"name": "address",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "latin1",
"length": 1024,
"position": 3,
"optional": true,
"autoIncremented": false,
"generated": false,
"comment": null,
"hasDefaultValue": true,
"enumValues": []
}
]
},
"comment": null
}
]
}
},
valueSchema=Schema {
io.debezium.connector.mysql.SchemaChangeValue:STRUCT
},
timestamp=null,
headers=ConnectHeaders(headers=)
}
]
```
</details>
Excluding random values such as timestamps, positions, GTIDs, and database
names, the only meaningful difference lies in the value of source.table.
```diff
- table=customers,
+ table=customers_old,customers,
```
In the case of renaming a single table, the source.table contains one table
name. However, for renaming multiple tables in a single statement, the
source.table includes multiple table names, separated by commas.
This behavior was observed from logs printed during the BinlogSplitReader
execution, which shows how the DataChangeEvent differs in these scenarios.
```diff
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index cd3c697e..49f567a9 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -165,6 +165,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
+ LOG.info("Read binlog event: {}", event);
if (isParsingOnLineSchemaChanges) {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]