moody1117 commented on issue #10405:
URL: https://github.com/apache/seatunnel/issues/10405#issuecomment-3809196040

   Thank you for your reply.
   1.Data may contain null values.
   2.I modified the configuration based on the recommended approach in the 
schema documentation and the Avro format documentation, but the data still 
fails to parse correctly. When no schema is defined and data from this 
Avro-formatted topic is read into another Avro-formatted topic, the data 
becomes unparsable by the Flink program as well, indicating that the content 
has changed.
   3.Currently, only this WARN log exists. If you comment out 
`format_error_handle_way = “skip”`, an ArrayIndexOutOfBoundsException error 
will occur.
   
   `env {
     parallelism = 1
     job.mode = "BATCH"
   }
   source {
     Kafka {
       topic = "t_event"
       bootstrap.servers = "xxx:9092"
       consumer.group = "test_avro_kafka"
       start_mode = "timestamp"
       start_mode.timestamp = "1769504419000"
       start_mode.end_timestamp = "1769504479000"
       format = "avro"
       format_error_handle_way = "skip"
       
       plugin_output = "kafka_table"
       kafka.config = {
         enable.auto.commit = "true"
         max.poll.records = "10000"
         
       }
       schema {
         table = "simple_example"
         columns = [
           { name = "dev_id",        type = "string",  nullable = true },
           { name = "dev_ip",        type = "string",  nullable = true },
           { name = "event_id",      type = "string",  nullable = true },
           { name = "timestamp",     type = "bigint",  nullable = true },
           { name = "s_ipv4",        type = "string",  nullable = true },
           { name = "s_ipv6",        type = "string",  nullable = true },
           { name = "s_port",        type = "int",     nullable = true },
           { name = "d_ipv4",        type = "string",  nullable = true },
           { name = "d_ipv6",        type = "string",  nullable = true },
           { name = "d_port",        type = "int",     nullable = true },
           { name = "sid",           type = "int",     nullable = true },
           { name = "severity",      type = "int",     nullable = true },
           { name = "alarm_name",    type = "string",  nullable = true },
           { name = "alarm_type",    type = "string",  nullable = true },
           { name = "classification",type = "string",  nullable = true },
           { name = "protocol",      type = "string",  nullable = true },
           { name = "domain_name",   type = "string",  nullable = true },
           { name = "request_method",type = "string",  nullable = true },
           { name = "protocol_version", type = "string", nullable = true },
           { name = "request_uri",   type = "string",  nullable = true },
           { name = "request_url",   type = "string",  nullable = true },
           { name = "referer",       type = "string",  nullable = true },
           { name = "user_agent",    type = "string",  nullable = true },
           { name = "request_header",type = "string",  nullable = true },
           { name = "request_body",  type = "string",  nullable = true },
           { name = "response_code", type = "string",  nullable = true },
           { name = "response_header",type = "string", nullable = true },
           { name = "response_body", type = "string",  nullable = true },
           { name = "XFF",           type = "string",  nullable = true },
           { name = "extra_info",    type = "string",  nullable = true },
           { name = "s_country",     type = "string",  nullable = true },
           { name = "s_province",    type = "string",  nullable = true },
           { name = "s_city",        type = "string",  nullable = true },
           { name = "s_operator",    type = "string",  nullable = true },
           { name = "d_country",     type = "string",  nullable = true },
           { name = "d_province",    type = "string",  nullable = true },
           { name = "d_city",        type = "string",  nullable = true },
           { name = "d_operator",    type = "string",  nullable = true },
           { name = "payload",       type = "string",  nullable = true },
           { name = "attack_result", type = "int",     nullable = true },
           { name = "attack_chain",  type = "int",     nullable = true },
           { name = "description",   type = "string",  nullable = true },
           { name = "solution",      type = "string",  nullable = true },
           { name = "original_info", type = "string",  nullable = true },
           { name = "pcap_info",     type = "string",  nullable = true }
         ]
       }
     }
   }
   sink {
     Console {
       plugin_input = "kafka_table"
     }
     LocalFile {
       plugin_input = "kafka_table"
       path = "/data/seatunnel/file"
       file_format_type = "json"
       custom_filename = true
       filename_time_format = "yyyy.MM.dd"
       is_enable_transaction = true
       file_name_expression = "${transactionId}_${now}"
      }
   }`
   
   `2026-01-27 18:01:20,957 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.seaTunnel.task.thread-5] - Job (1068470671115288577), Pipeline: 
[(1/1)], task: [pipeline-1 [Source[0]-Kafka]-SourceTask (1/1)], 
taskGroupLocation: [TaskGroupLocation{jobId=1068470671115288577, pipelineId=1, 
taskGroupId=3}] state process is stopped
   2026-01-27 18:01:20,957 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.seaTunnel.task.thread-5] - Job (1068470671115288577), Pipeline: 
[(1/1)], task: [pipeline-1 [Source[0]-Kafka]-SourceTask (1/1)], 
taskGroupLocation: [TaskGroupLocation{jobId=1068470671115288577, pipelineId=1, 
taskGroupId=3}] end with state FAILED and Exception: 
java.lang.ArrayIndexOutOfBoundsException: 12
           at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
           at 
org.apache.seatunnel.format.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:51)
           at 
org.apache.seatunnel.format.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:34)
           at 
org.apache.seatunnel.api.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:38)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:72)
           at 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:37)
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)`
   
   Flink tasks compile the schema from event.avro files into Java classes, then 
parse the data using the AvroDeserializationSchema serializer.
   
   <img width="1616" height="990" alt="Image" 
src="https://github.com/user-attachments/assets/afe84eba-fd6f-4141-8ca2-832850c1c3cd";
 />
   
   <img width="1246" height="903" alt="Image" 
src="https://github.com/user-attachments/assets/30848d09-15da-4929-b2ec-256b0ca6be48";
 />


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to