Flink CDC Issue Import created FLINK-34824:
----------------------------------------------

             Summary: Mysql-connector cdc sourcerecord not have data type 
information,bug dez record have
                 Key: FLINK-34824
                 URL: https://issues.apache.org/jira/browse/FLINK-34824
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
nothing similar.


### Motivation

when we develop streaming data platform , we also need data types to satisfied 
dynamic change  table schema. But i fund Mysql-connector cdc record has too 
many useless information. I suggested to use dbz record information type to 
meet more needed.

### Solution

change sourcerecord code  to dbz record . such as below 

### Alternatives

EmbeddedEngineChangeEvent[key = {
                "schema": {
                        "type": "struct",
                        "fields": [{
                                "type": "int64",
                                "optional": false,
                                "field": "id"
                        }],
                        "optional": false,
                        "name": "mysql_connector.gmall.activity_info.Key"
                },
                "payload": {
                        "id": 3
                }
        }, value = {
                "schema": {
                        "type": "struct",
                        "fields": [{
                                "type": "struct",
                                "fields": [{
                                        "type": "int64",
                                        "optional": false,
                                        "field": "id"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_name"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_type"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_desc"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "start_time"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "end_time"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "create_time"
                                }],
                                "optional": true,
                                "name": 
"mysql_connector.gmall.activity_info.Value",
                                "field": "before"
                        }, {
                                "type": "struct",
                                "fields": [{
                                        "type": "int64",
                                        "optional": false,
                                        "field": "id"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_name"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_type"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "activity_desc"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "start_time"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "end_time"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "name": "io.debezium.time.Timestamp",
                                        "version": 1,
                                        "field": "create_time"
                                }],
                                "optional": true,
                                "name": 
"mysql_connector.gmall.activity_info.Value",
                                "field": "after"
                        }, {
                                "type": "struct",
                                "fields": [{
                                        "type": "string",
                                        "optional": false,
                                        "field": "version"
                                }, {
                                        "type": "string",
                                        "optional": false,
                                        "field": "connector"
                                }, {
                                        "type": "string",
                                        "optional": false,
                                        "field": "name"
                                }, {
                                        "type": "int64",
                                        "optional": false,
                                        "field": "ts_ms"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "name": "io.debezium.data.Enum",
                                        "version": 1,
                                        "parameters": {
                                                "allowed": 
"true,last,false,incremental"
                                        },
                                        "default": "false",
                                        "field": "snapshot"
                                }, {
                                        "type": "string",
                                        "optional": false,
                                        "field": "db"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "sequence"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "table"
                                }, {
                                        "type": "int64",
                                        "optional": false,
                                        "field": "server_id"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "gtid"
                                }, {
                                        "type": "string",
                                        "optional": false,
                                        "field": "file"
                                }, {
                                        "type": "int64",
                                        "optional": false,
                                        "field": "pos"
                                }, {
                                        "type": "int32",
                                        "optional": false,
                                        "field": "row"
                                }, {
                                        "type": "int64",
                                        "optional": true,
                                        "field": "thread"
                                }, {
                                        "type": "string",
                                        "optional": true,
                                        "field": "query"
                                }],
                                "optional": false,
                                "name": "io.debezium.connector.mysql.Source",
                                "field": "source"
                        }, {
                                "type": "string",
                                "optional": false,
                                "field": "op"
                        }, {
                                "type": "int64",
                                "optional": true,
                                "field": "ts_ms"
                        }, {
                                "type": "struct",
                                "fields": [{
                                        "type": "string",
                                        "optional": false,
                                        "field": "id"
                                }, {
                                        "type": "int64",
                                        "optional": false,
                                        "field": "total_order"
                                }, {
                                        "type": "int64",
                                        "optional": false,
                                        "field": "data_collection_order"
                                }],
                                "optional": true,
                                "field": "transaction"
                        }],
                        "optional": false,
                        "name": "mysql_connector.gmall.activity_info.Envelope"
                },
                "payload": {
                        "before": null,
                        "after": {
                                "id": 3,
                                "activity_name": "ccccc",
                                "activity_type": "1003",
                                "activity_desc": "fffff",
                                "start_time": null,
                                "end_time": null,
                                "create_time": null
                        },
                        "source": {
                                "version": "1.9.5.Final",
                                "connector": "mysql",
                                "name": "mysql-connector",
                                "ts_ms": 1694568910248,
                                "snapshot": "true",
                                "db": "gmall",
                                "sequence": null,
                                "table": "activity_info",
                                "server_id": 0,
                                "gtid": null,
                                "file": "mysql-bin.000015",
                                "pos": 154,
                                "row": 0,
                                "thread": null,
                                "query": null
                        },
                        "op": "r",
                        "ts_ms": 1694568910248,
                        "transaction": null
                }
        }, sourceRecord = SourceRecord {
                sourcePartition = {
                        server = mysql - connector
                }, sourceOffset = {
                        ts_sec = 1694568910,
                        file = mysql - bin .000015,
                        pos = 154,
                        snapshot = true
                }
        }
        ConnectRecord {
                topic = 'mysql-connector.gmall.activity_info', kafkaPartition = 
null, key = Struct {
                        id = 3
                }, keySchema = Schema {
                        mysql_connector.gmall.activity_info.Key: STRUCT
                }, value = Struct {
                        after = Struct {
                                id = 3, activity_name = ccccc, activity_type = 
1003, activity_desc = fffff
                        }, source = Struct {
                                version = 1.9 .5.Final, connector = mysql, name 
= mysql - connector, ts_ms = 1694568910248, snapshot = true, db = gmall, table 
= activity_info, server_id = 0, file = mysql - bin .000015, pos = 154, row = 0
                        }, op = r, ts_ms = 1694568910248
                }, valueSchema = Schema {
                        mysql_connector.gmall.activity_info.Envelope: STRUCT
                }, timestamp = null, headers = ConnectHeaders(headers = )
        }]

### Anything else?

_No response_

### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2614
Created by: [niuhu3|https://github.com/niuhu3]
Labels: enhancement, 
Created at: Fri Nov 03 16:23:04 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to