[
https://issues.apache.org/jira/browse/FLINK-38769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Yu updated FLINK-38769:
---------------------------
Description:
Our flink cdc job catch an exception.
The exception is as follows:
{code:java}
//
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while
processing event at offset {transaction_id=null, ts_sec=xxx,
file=xxx-bin.005958, pos=550478108,
gtids=02a14707-1da1-11ef-9507-08c0eb82bce8:1-12266088830,3d496217-ca2c-11ea-9cde-e4434bd6a6e4:1-421074,3f0aa7dd-2ac3-11ef-99f0-043f72f048fe:1-184350,54e0c8f3-7a06-11ea-b241-fa163e0b065c:1-117143907,9a85d197-4452-11ec-86d9-b8cef6e1aa34:1-384810,b94d9c8d-bc20-11ea-8cce-0c42a1453028:1-564113948,cded5277-3135-11e9-b497-fa163e967ebc:1-46832,cf6f6734-1460-11eb-834a-0c42a1457a78:1-8142594382,ee0fafda-199a-11e9-89d3-fa163e9b5db4:1-92998908,
row=1, server_id=20743, event=3}
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:248)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:842)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:947)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:833)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1058)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
... 7 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null
used for required field: "datachange_createtime", schema type: STRING
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:243)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:226)
at org.apache.kafka.connect.data.Struct.put(Struct.java:213)
at io.debezium.data.Envelope.create(Envelope.java:298)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:82)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:211)
... 12 more{code}
We added some logs in RelationalChangeRecordEmitter and found that the records
are incomplete.
The table structure has 20 columns, but only 17 columns of records are present,
with 3 columns missing.
Schema info
{code:java}
//
{
"schema": {
"key": {
"name": "mysql_binlog_source.xxxx.Key",
"type": "STRUCT",
"optional": "false",
"default": null,
"fields": [
{
"name": "f0",
"index": "0",
"schema": { "type": "INT64", "optional": "false",
"default": null }
}
]
},
"value": {
"name": "mysql_binlog_source.xxxx",
"type": "STRUCT",
"optional": "true",
"default": null,
"fields": [
{ "name": "f0", "index": "0", "schema": { "type": "INT64",
"optional": "false", "default": null }},
{ "name": "f1", "index": "1", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f2", "index": "2", "schema": { "type": "INT32",
"optional": "false", "default": "0" }},
{ "name": "f3", "index": "3", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f4", "index": "4", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f5", "index": "5", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f6", "index": "6", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f7", "index": "7", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.0000", "version": "1" }},
{ "name": "f8", "index": "8", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f9", "index": "9", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f10", "index": "10", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f11", "index": "11", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f12", "index": "12", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f13", "index": "13", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f14", "index": "14", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f15", "index": "15", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f16", "index": "16", "schema": { "type": "INT16",
"optional": "false", "default": "1" }},
{ "name": "f17", "index": "17", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f18", "index": "18", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f19", "index": "19", "schema": { "type": "STRING",
"optional": "true", "default": "" }}
]
}
}
}
{code}
Data info
{code:java}
//
record: Struct{
f0=150000,
f1=280000,
f2=4900000,
f3=600,
f4=0.66,
f5=0.66,
f6=XXX,
f7=1.0000,
f8=XXX,
f9=XXX,
f10=,
f11=,
f12=,
f13=,
f14=0.00,
f15=3549000000,
f16=1
}
{code}
I suspect that debezium did not parse completely during the parsing process.
was:
Our flink cdc job catch an exception.
The exception is as follows:
{code:java}
//代码占位符
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while
processing event at offset {transaction_id=null, ts_sec=xxx,
file=xxx-bin.005958, pos=550478108,
gtids=02a14707-1da1-11ef-9507-08c0eb82bce8:1-12266088830,3d496217-ca2c-11ea-9cde-e4434bd6a6e4:1-421074,3f0aa7dd-2ac3-11ef-99f0-043f72f048fe:1-184350,54e0c8f3-7a06-11ea-b241-fa163e0b065c:1-117143907,9a85d197-4452-11ec-86d9-b8cef6e1aa34:1-384810,b94d9c8d-bc20-11ea-8cce-0c42a1453028:1-564113948,cded5277-3135-11e9-b497-fa163e967ebc:1-46832,cf6f6734-1460-11eb-834a-0c42a1457a78:1-8142594382,ee0fafda-199a-11e9-89d3-fa163e9b5db4:1-92998908,
row=1, server_id=20743, event=3}
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:248)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:842)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:947)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:833)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1058)
at
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
... 7 more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null
used for required field: "datachange_createtime", schema type: STRING
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:243)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:226)
at org.apache.kafka.connect.data.Struct.put(Struct.java:213)
at io.debezium.data.Envelope.create(Envelope.java:298)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:82)
at
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:211)
... 12 more{code}
We added some logs in RelationalChangeRecordEmitter and found that the records
are incomplete.
The table structure has 20 columns, but only 17 columns of records are present,
with 3 columns missing.
Schema info
{code:java}
//代码占位符
{
"schema": {
"key": {
"name": "mysql_binlog_source.xxxx.Key",
"type": "STRUCT",
"optional": "false",
"default": null,
"fields": [
{
"name": "f0",
"index": "0",
"schema": { "type": "INT64", "optional": "false",
"default": null }
}
]
},
"value": {
"name": "mysql_binlog_source.xxxx",
"type": "STRUCT",
"optional": "true",
"default": null,
"fields": [
{ "name": "f0", "index": "0", "schema": { "type": "INT64",
"optional": "false", "default": null }},
{ "name": "f1", "index": "1", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f2", "index": "2", "schema": { "type": "INT32",
"optional": "false", "default": "0" }},
{ "name": "f3", "index": "3", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f4", "index": "4", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f5", "index": "5", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f6", "index": "6", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f7", "index": "7", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.0000", "version": "1" }},
{ "name": "f8", "index": "8", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f9", "index": "9", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f10", "index": "10", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f11", "index": "11", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f12", "index": "12", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f13", "index": "13", "schema": { "type": "STRING",
"optional": "false", "default": "" }},
{ "name": "f14", "index": "14", "schema": { "name":
"org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional": "false",
"default": "0.00", "version": "1" }},
{ "name": "f15", "index": "15", "schema": { "type": "INT64",
"optional": "false", "default": "0" }},
{ "name": "f16", "index": "16", "schema": { "type": "INT16",
"optional": "false", "default": "1" }},
{ "name": "f17", "index": "17", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f18", "index": "18", "schema": { "type": "STRING",
"optional": "false", "default": null }},
{ "name": "f19", "index": "19", "schema": { "type": "STRING",
"optional": "true", "default": "" }}
]
}
}
}
{code}
Data info
{code:java}
//代码占位符
record: Struct{
f0=150000,
f1=280000,
f2=4900000,
f3=600,
f4=0.66,
f5=0.66,
f6=XXX,
f7=1.0000,
f8=XXX,
f9=XXX,
f10=,
f11=,
f12=,
f13=,
f14=0.00,
f15=3549000000,
f16=1
}
{code}
I suspect that debezium did not parse completely during the parsing process.
> Flink CDC parsing of MySQL data is incomplete.
> ----------------------------------------------
>
> Key: FLINK-38769
> URL: https://issues.apache.org/jira/browse/FLINK-38769
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.0
> Environment: Flink 1.17
> Fink CDC 3.1
> Reporter: Hao Yu
> Priority: Major
>
> Our flink cdc job catch an exception.
>
> The exception is as follows:
> {code:java}
> //
> Caused by: org.apache.kafka.connect.errors.ConnectException: Error while
> processing event at offset {transaction_id=null, ts_sec=xxx,
> file=xxx-bin.005958, pos=550478108,
> gtids=02a14707-1da1-11ef-9507-08c0eb82bce8:1-12266088830,3d496217-ca2c-11ea-9cde-e4434bd6a6e4:1-421074,3f0aa7dd-2ac3-11ef-99f0-043f72f048fe:1-184350,54e0c8f3-7a06-11ea-b241-fa163e0b065c:1-117143907,9a85d197-4452-11ec-86d9-b8cef6e1aa34:1-384810,b94d9c8d-bc20-11ea-8cce-0c42a1453028:1-564113948,cded5277-3135-11e9-b497-fa163e967ebc:1-46832,cf6f6734-1460-11eb-834a-0c42a1457a78:1-8142594382,ee0fafda-199a-11e9-89d3-fa163e9b5db4:1-92998908,
> row=1, server_id=20743, event=3}
> at
> io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:248)
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:842)
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:947)
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:833)
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:1058)
> at
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
> ... 7 more
> Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null
> used for required field: "datachange_createtime", schema type: STRING
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
> at org.apache.kafka.connect.data.Struct.validate(Struct.java:243)
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
> at org.apache.kafka.connect.data.Struct.put(Struct.java:226)
> at org.apache.kafka.connect.data.Struct.put(Struct.java:213)
> at io.debezium.data.Envelope.create(Envelope.java:298)
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:82)
> at
> io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54)
> at
> io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:211)
> ... 12 more{code}
> We added some logs in RelationalChangeRecordEmitter and found that the
> records are incomplete.
> The table structure has 20 columns, but only 17 columns of records are
> present, with 3 columns missing.
> Schema info
> {code:java}
> //
> {
> "schema": {
> "key": {
> "name": "mysql_binlog_source.xxxx.Key",
> "type": "STRUCT",
> "optional": "false",
> "default": null,
> "fields": [
> {
> "name": "f0",
> "index": "0",
> "schema": { "type": "INT64", "optional": "false",
> "default": null }
> }
> ]
> },
> "value": {
> "name": "mysql_binlog_source.xxxx",
> "type": "STRUCT",
> "optional": "true",
> "default": null,
> "fields": [
> { "name": "f0", "index": "0", "schema": { "type": "INT64",
> "optional": "false", "default": null }},
> { "name": "f1", "index": "1", "schema": { "type": "INT64",
> "optional": "false", "default": "0" }},
> { "name": "f2", "index": "2", "schema": { "type": "INT32",
> "optional": "false", "default": "0" }},
> { "name": "f3", "index": "3", "schema": { "type": "INT64",
> "optional": "false", "default": "0" }},
> { "name": "f4", "index": "4", "schema": { "name":
> "org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional":
> "false", "default": "0.00", "version": "1" }},
> { "name": "f5", "index": "5", "schema": { "name":
> "org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional":
> "false", "default": "0.00", "version": "1" }},
> { "name": "f6", "index": "6", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f7", "index": "7", "schema": { "name":
> "org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional":
> "false", "default": "0.0000", "version": "1" }},
> { "name": "f8", "index": "8", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f9", "index": "9", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f10", "index": "10", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f11", "index": "11", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f12", "index": "12", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f13", "index": "13", "schema": { "type": "STRING",
> "optional": "false", "default": "" }},
> { "name": "f14", "index": "14", "schema": { "name":
> "org.apache.kafka.connect.data.Decimal", "type": "BYTES", "optional":
> "false", "default": "0.00", "version": "1" }},
> { "name": "f15", "index": "15", "schema": { "type": "INT64",
> "optional": "false", "default": "0" }},
> { "name": "f16", "index": "16", "schema": { "type": "INT16",
> "optional": "false", "default": "1" }},
> { "name": "f17", "index": "17", "schema": { "type": "STRING",
> "optional": "false", "default": null }},
> { "name": "f18", "index": "18", "schema": { "type": "STRING",
> "optional": "false", "default": null }},
> { "name": "f19", "index": "19", "schema": { "type": "STRING",
> "optional": "true", "default": "" }}
> ]
> }
> }
> }
> {code}
> Data info
> {code:java}
> //
> record: Struct{
> f0=150000,
> f1=280000,
> f2=4900000,
> f3=600,
> f4=0.66,
> f5=0.66,
> f6=XXX,
> f7=1.0000,
> f8=XXX,
> f9=XXX,
> f10=,
> f11=,
> f12=,
> f13=,
> f14=0.00,
> f15=3549000000,
> f16=1
> }
> {code}
> I suspect that debezium did not parse completely during the parsing process.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)