[ 
https://issues.apache.org/jira/browse/FLINK-18758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238790#comment-17238790
 ] 

Jark Wu commented on FLINK-18758:
---------------------------------

I got the following exception when I using {{debezium-avro-confluent}} format:


{code}
2020-11-25 20:22:33
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:533)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Can't deserialize Debezium Avro message.
        at 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:172)
        at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.io.IOException: Failed to deserialize Avro record.
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
        at 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:144)
        ... 7 more
Caused by: org.apache.flink.avro.shaded.org.apache.avro.AvroTypeException: 
Found dbserver1.mydb.orders.Value, expecting union
        at 
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at 
org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
        at 
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at 
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:75)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
        ... 8 more
{code}

The exception message is misleading. The reason behind this is that I have a 
MySQL table which has BOOLEAN column. However, BOOLEAN is an alias of tinyint 
in MySQL. Therefore, debezium captures this column in INT type. And there is a 
type mismatch when Flink SQL reading this topic using BOOLEAN type. When using 
INT type in Flink SQL, it works well. 

MySQL table:


{code:sql}
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price BIGINT NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
{code}



{code:java}
CREATE TABLE orders (
  order_id INT NOT NULL,
  order_date TIMESTAMP(0) NOT NULL,
  customer_name STRING NOT NULL,
  price BIGINT NOT NULL,
  product_id INT NOT NULL,
  order_status BOOLEAN NOT NULL,  -- this is wrong, we should use INT here
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'dbserver1.mydb.orders',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-avro-confluent',
    'debezium-avro-confluent.schema-registry.url' = 'http://localhost:8181',
    'debezium-avro-confluent.schema-registry.subject' = 
'dbserver1.mydb.orders-value'
);
{code}

The Avro schema generated by Debezium: 


{code:json}
{
  "type": "record",
  "name": "Envelope",
  "namespace": "dbserver1.mydb.orders",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "order_id",
              "type": "int"
            },
            {
              "name": "order_date",
              "type": {
                "type": "long",
                "connect.version": 1,
                "connect.name": "io.debezium.time.Timestamp"
              }
            },
            {
              "name": "customer_name",
              "type": "string"
            },
            {
              "name": "price",
              "type": {
                "type": "bytes",
                "scale": 5,
                "precision": 10,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "5",
                  "connect.decimal.precision": "10"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            },
            {
              "name": "product_id",
              "type": "int"
            },
            {
              "name": "order_status",
              "type": {
                "type": "int",
                "connect.type": "int16"
              }
            }
          ],
          "connect.name": "dbserver1.mydb.orders.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {...}
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "transaction",
      "type": ...,
      "default": null
    }
  ],
  "connect.name": "dbserver1.mydb.orders.Envelope"
}
{code}





> Many data types do not work in debezium format 
> -----------------------------------------------
>
>                 Key: FLINK-18758
>                 URL: https://issues.apache.org/jira/browse/FLINK-18758
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Leonard Xu
>            Assignee: Jark Wu
>            Priority: Major
>             Fix For: 1.12.0
>
>
> Currently debezium json format  wrapper the json format to 
> serialize/deserialize the debezuim json format data.
> But debezium json format has its own data type which consists of Literal Type 
> and Semantic Type[1], i.g. Date type in debezium is an integer which 
> represents the number of days since epoch rather than a string with 
> 'yyyy-MM-dd' pattern.  
> {code:java}
> {   "schema":{
>       "fields":[  
>        {            
>                 "fields":[
>                {                 
>                  "type":"int32",      //Literal Type
>                   "optional":false,
>                   "name":"io.debezium.time.Date", //semantic Type
>                   "version":1,
>                   "field":"order_date"
>                },
>                {                  "type":"int32",
>                   "optional":false,
>                   "field":"quantity"
>                }
>             ]
>          }
>       ]
>    },
>    "payload":{ 
>       "before":null,
>       "after":{ 
>          "order_date":16852, // Literal Value, the number of days since epoch
>         "quantity":1
>       },
>       "op":"c",
>       "ts_ms":1596081813474
>    }
> } {code}
>  
> I think we need obtain the debezuim data type from schema information and 
> then serialize/deserialize the data in payload.
> [1][https://debezium.io/documentation/reference/1.2/connectors/mysql.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to