[ https://issues.apache.org/jira/browse/FLINK-18758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238790#comment-17238790 ]
Jark Wu commented on FLINK-18758: --------------------------------- I got the following exception when I using {{debezium-avro-confluent}} format: {code} 2020-11-25 20:22:33 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:533) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.io.IOException: Can't deserialize Debezium Avro message. at org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:172) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104) at org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:144) ... 7 more Caused by: org.apache.flink.avro.shaded.org.apache.avro.AvroTypeException: Found dbserver1.mydb.orders.Value, expecting union at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Parser.advance(Parser.java:86) at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:75) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) ... 8 more {code} The exception message is misleading. The reason behind this is that I have a MySQL table which has BOOLEAN column. However, BOOLEAN is an alias of tinyint in MySQL. Therefore, debezium captures this column in INT type. And there is a type mismatch when Flink SQL reading this topic using BOOLEAN type. When using INT type in Flink SQL, it works well. MySQL table: {code:sql} CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price BIGINT NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL ) AUTO_INCREMENT = 10001; {code} {code:java} CREATE TABLE orders ( order_id INT NOT NULL, order_date TIMESTAMP(0) NOT NULL, customer_name STRING NOT NULL, price BIGINT NOT NULL, product_id INT NOT NULL, order_status BOOLEAN NOT NULL, -- this is wrong, we should use INT here PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.mydb.orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-avro-confluent', 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8181', 'debezium-avro-confluent.schema-registry.subject' = 'dbserver1.mydb.orders-value' ); {code} The Avro schema generated by Debezium: {code:json} { "type": "record", "name": "Envelope", "namespace": "dbserver1.mydb.orders", "fields": [ { "name": "before", "type": [ "null", { "type": "record", "name": "Value", "fields": [ { "name": "order_id", "type": "int" }, { "name": "order_date", "type": { "type": "long", "connect.version": 1, "connect.name": "io.debezium.time.Timestamp" } }, { "name": "customer_name", "type": "string" }, { "name": "price", "type": { "type": "bytes", "scale": 5, "precision": 10, "connect.version": 1, "connect.parameters": { "scale": "5", "connect.decimal.precision": "10" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } }, { "name": "product_id", "type": "int" }, { "name": "order_status", "type": { "type": "int", "connect.type": "int16" } } ], "connect.name": "dbserver1.mydb.orders.Value" } ], "default": null }, { "name": "after", "type": [ "null", "Value" ], "default": null }, { "name": "source", "type": {...} }, { "name": "op", "type": "string" }, { "name": "ts_ms", "type": [ "null", "long" ], "default": null }, { "name": "transaction", "type": ..., "default": null } ], "connect.name": "dbserver1.mydb.orders.Envelope" } {code} > Many data types do not work in debezium format > ----------------------------------------------- > > Key: FLINK-18758 > URL: https://issues.apache.org/jira/browse/FLINK-18758 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Reporter: Leonard Xu > Assignee: Jark Wu > Priority: Major > Fix For: 1.12.0 > > > Currently debezium json format wrapper the json format to > serialize/deserialize the debezuim json format data. > But debezium json format has its own data type which consists of Literal Type > and Semantic Type[1], i.g. Date type in debezium is an integer which > represents the number of days since epoch rather than a string with > 'yyyy-MM-dd' pattern. > {code:java} > { "schema":{ > "fields":[ > { > "fields":[ > { > "type":"int32", //Literal Type > "optional":false, > "name":"io.debezium.time.Date", //semantic Type > "version":1, > "field":"order_date" > }, > { "type":"int32", > "optional":false, > "field":"quantity" > } > ] > } > ] > }, > "payload":{ > "before":null, > "after":{ > "order_date":16852, // Literal Value, the number of days since epoch > "quantity":1 > }, > "op":"c", > "ts_ms":1596081813474 > } > } {code} > > I think we need obtain the debezuim data type from schema information and > then serialize/deserialize the data in payload. > [1][https://debezium.io/documentation/reference/1.2/connectors/mysql.html] > -- This message was sent by Atlassian Jira (v8.3.4#803005)