[
https://issues.apache.org/jira/browse/FLINK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-21172.
---------------------------
Fix Version/s: 1.13.0
Resolution: Fixed
Fixed in master: 195056b11f5266599260832e3e216330b58c5eaf
> canal-json format include es field
> ----------------------------------
>
> Key: FLINK-21172
> URL: https://issues.apache.org/jira/browse/FLINK-21172
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Ecosystem
> Affects Versions: 1.12.0, 1.12.1
> Reporter: jiabao sun
> Assignee: Nicholas Jiang
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Canal flat message json format has an 'es' field extracted from mysql binlog
> which means the row data real change time in mysql. It expressed the event
> time naturally but was ignored during deserialization.
> {code:json}
> {
> "data": [
> {
> "id": "111",
> "name": "scooter",
> "description": "Big 2-wheel scooter",
> "weight": "5.18"
> }
> ],
> "database": "inventory",
> "es": 1589373560000,
> "id": 9,
> "isDdl": false,
> "mysqlType": {
> "id": "INTEGER",
> "name": "VARCHAR(255)",
> "description": "VARCHAR(512)",
> "weight": "FLOAT"
> },
> "old": [
> {
> "weight": "5.15"
> }
> ],
> "pkNames": [
> "id"
> ],
> "sql": "",
> "sqlType": {
> "id": 4,
> "name": 12,
> "description": 12,
> "weight": 7
> },
> "table": "products",
> "ts": 1589373560798,
> "type": "UPDATE"
> }
> {code}
> org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
> {code:java}
> private static RowType createJsonRowType(DataType databaseSchema) {
> // Canal JSON contains other information, e.g. "ts", "sql", but we
> don't need them
> return (RowType)
> DataTypes.ROW(
> DataTypes.FIELD("data",
> DataTypes.ARRAY(databaseSchema)),
> DataTypes.FIELD("old",
> DataTypes.ARRAY(databaseSchema)),
> DataTypes.FIELD("type", DataTypes.STRING()),
> DataTypes.FIELD("database",
> DataTypes.STRING()),
> DataTypes.FIELD("table", DataTypes.STRING()))
> .getLogicalType();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)