UFOXD commented on issue #6955:
URL: https://github.com/apache/seatunnel/issues/6955#issuecomment-2151644714
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:807)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:756)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize consumer record
ConsumerRecord(topic = test1, partition = 0, leaderEpoch = 0, offset = 126,
CreateTime = 1717660426947, serialized key size = -1, serialized value size =
135, headers = RecordHeaders(headers = [], isReadOnly = false), key = null,
value = [B@3168a118).
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: java.io.IOException: Corrupt Canal JSON message
'{"data":{"runoob_id":74,"runoob_title":"学习
PHP","runoob_author":"菜鸟教程4444","submission_date":"2024-06-06"},"type":"INSERT"}'.
at
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:285)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by:
org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail
to deserialize at field: data.
at
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:354)
at
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:380)
at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.convertToRowData(JsonRowDataDeserializationSchema.java:121)
at
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:228)
... 17 more
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
at
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createArrayConverter$94141d67$1(JsonToRowDataConverters.java:300)
at
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$wrapIntoNullableConverter$de0b9253$1(JsonToRowDataConverters.java:380)
at
org.apache.flink.formats.json.JsonToRowDataConverters.convertField(JsonToRowDataConverters.java:370)
at
org.apache.flink.formats.json.JsonToRowDataConverters.lambda$createRowConverter$ef66fe9a$1(JsonToRowDataConverters.java:350)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]