[
https://issues.apache.org/jira/browse/FLINK-18758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238790#comment-17238790
]
Jark Wu edited comment on FLINK-18758 at 11/25/20, 3:26 PM:
------------------------------------------------------------
I got the following exception when I using {{debezium-avro-confluent}} format:
{code}
2020-11-25 20:22:33
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:533)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Can't deserialize Debezium Avro message.
at
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:172)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
at
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:144)
... 7 more
Caused by: org.apache.flink.avro.shaded.org.apache.avro.AvroTypeException:
Found dbserver1.mydb.orders.Value, expecting union
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at
org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:75)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
... 8 more
{code}
The exception message is misleading. The reason behind this is that I have a
MySQL table which has BOOLEAN column. However, BOOLEAN is an alias of tinyint
in MySQL. Therefore, debezium captures this column in INT type. And there is a
type mismatch when Flink SQL reading this topic using BOOLEAN type. When using
INT type in Flink SQL, it works well.
MySQL table:
{code:sql}
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price BIGINT NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
{code}
Flink SQL:
{code:java}
CREATE TABLE orders (
order_id INT NOT NULL,
order_date TIMESTAMP(0) NOT NULL,
customer_name STRING NOT NULL,
price BIGINT NOT NULL,
product_id INT NOT NULL,
order_status BOOLEAN NOT NULL, -- this is wrong, we should use INT here
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.mydb.orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8181',
'debezium-avro-confluent.schema-registry.subject' =
'dbserver1.mydb.orders-value'
);
SELECT * FROM orders;
{code}
The Avro schema generated by Debezium:
{code:json}
{
"type": "record",
"name": "Envelope",
"namespace": "dbserver1.mydb.orders",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "order_id",
"type": "int"
},
{
"name": "order_date",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
},
{
"name": "customer_name",
"type": "string"
},
{
"name": "price",
"type": {
"type": "bytes",
"scale": 5,
"precision": 10,
"connect.version": 1,
"connect.parameters": {
"scale": "5",
"connect.decimal.precision": "10"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
},
{
"name": "product_id",
"type": "int"
},
{
"name": "order_status",
"type": {
"type": "int",
"connect.type": "int16"
}
}
],
"connect.name": "dbserver1.mydb.orders.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {...}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "transaction",
"type": ...,
"default": null
}
],
"connect.name": "dbserver1.mydb.orders.Envelope"
}
{code}
was (Author: jark):
I got the following exception when I using {{debezium-avro-confluent}} format:
{code}
2020-11-25 20:22:33
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:533)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Can't deserialize Debezium Avro message.
at
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:172)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.io.IOException: Failed to deserialize Avro record.
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
at
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:144)
... 7 more
Caused by: org.apache.flink.avro.shaded.org.apache.avro.AvroTypeException:
Found dbserver1.mydb.orders.Value, expecting union
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at
org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:75)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
... 8 more
{code}
The exception message is misleading. The reason behind this is that I have a
MySQL table which has BOOLEAN column. However, BOOLEAN is an alias of tinyint
in MySQL. Therefore, debezium captures this column in INT type. And there is a
type mismatch when Flink SQL reading this topic using BOOLEAN type. When using
INT type in Flink SQL, it works well.
MySQL table:
{code:sql}
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price BIGINT NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
{code}
{code:java}
CREATE TABLE orders (
order_id INT NOT NULL,
order_date TIMESTAMP(0) NOT NULL,
customer_name STRING NOT NULL,
price BIGINT NOT NULL,
product_id INT NOT NULL,
order_status BOOLEAN NOT NULL, -- this is wrong, we should use INT here
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.mydb.orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8181',
'debezium-avro-confluent.schema-registry.subject' =
'dbserver1.mydb.orders-value'
);
{code}
The Avro schema generated by Debezium:
{code:json}
{
"type": "record",
"name": "Envelope",
"namespace": "dbserver1.mydb.orders",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "order_id",
"type": "int"
},
{
"name": "order_date",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "io.debezium.time.Timestamp"
}
},
{
"name": "customer_name",
"type": "string"
},
{
"name": "price",
"type": {
"type": "bytes",
"scale": 5,
"precision": 10,
"connect.version": 1,
"connect.parameters": {
"scale": "5",
"connect.decimal.precision": "10"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
},
{
"name": "product_id",
"type": "int"
},
{
"name": "order_status",
"type": {
"type": "int",
"connect.type": "int16"
}
}
],
"connect.name": "dbserver1.mydb.orders.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {...}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "transaction",
"type": ...,
"default": null
}
],
"connect.name": "dbserver1.mydb.orders.Envelope"
}
{code}
> Many data types do not work in debezium format
> -----------------------------------------------
>
> Key: FLINK-18758
> URL: https://issues.apache.org/jira/browse/FLINK-18758
> Project: Flink
> Issue Type: Sub-task
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: Leonard Xu
> Assignee: Jark Wu
> Priority: Major
> Fix For: 1.12.0
>
>
> Currently debezium json format wrapper the json format to
> serialize/deserialize the debezuim json format data.
> But debezium json format has its own data type which consists of Literal Type
> and Semantic Type[1], i.g. Date type in debezium is an integer which
> represents the number of days since epoch rather than a string with
> 'yyyy-MM-dd' pattern.
> {code:java}
> { "schema":{
> "fields":[
> {
> "fields":[
> {
> "type":"int32", //Literal Type
> "optional":false,
> "name":"io.debezium.time.Date", //semantic Type
> "version":1,
> "field":"order_date"
> },
> { "type":"int32",
> "optional":false,
> "field":"quantity"
> }
> ]
> }
> ]
> },
> "payload":{
> "before":null,
> "after":{
> "order_date":16852, // Literal Value, the number of days since epoch
> "quantity":1
> },
> "op":"c",
> "ts_ms":1596081813474
> }
> } {code}
>
> I think we need obtain the debezuim data type from schema information and
> then serialize/deserialize the data in payload.
> [1][https://debezium.io/documentation/reference/1.2/connectors/mysql.html]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)