moody1117 commented on issue #10405:
URL: https://github.com/apache/seatunnel/issues/10405#issuecomment-3809196040
Thank you for your reply.
1.Data may contain null values.
2.I modified the configuration based on the recommended approach in the
schema documentation and the Avro format documentation, but the data still
fails to parse correctly. When no schema is defined and data from this
Avro-formatted topic is read into another Avro-formatted topic, the data
becomes unparsable by the Flink program as well, indicating that the content
has changed.
3.Currently, only this WARN log exists. If you comment out
`format_error_handle_way = “skip”`, an ArrayIndexOutOfBoundsException error
will occur.
`env {
parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
topic = "t_event"
bootstrap.servers = "xxx:9092"
consumer.group = "test_avro_kafka"
start_mode = "timestamp"
start_mode.timestamp = "1769504419000"
start_mode.end_timestamp = "1769504479000"
format = "avro"
format_error_handle_way = "skip"
plugin_output = "kafka_table"
kafka.config = {
enable.auto.commit = "true"
max.poll.records = "10000"
}
schema {
table = "simple_example"
columns = [
{ name = "dev_id", type = "string", nullable = true },
{ name = "dev_ip", type = "string", nullable = true },
{ name = "event_id", type = "string", nullable = true },
{ name = "timestamp", type = "bigint", nullable = true },
{ name = "s_ipv4", type = "string", nullable = true },
{ name = "s_ipv6", type = "string", nullable = true },
{ name = "s_port", type = "int", nullable = true },
{ name = "d_ipv4", type = "string", nullable = true },
{ name = "d_ipv6", type = "string", nullable = true },
{ name = "d_port", type = "int", nullable = true },
{ name = "sid", type = "int", nullable = true },
{ name = "severity", type = "int", nullable = true },
{ name = "alarm_name", type = "string", nullable = true },
{ name = "alarm_type", type = "string", nullable = true },
{ name = "classification",type = "string", nullable = true },
{ name = "protocol", type = "string", nullable = true },
{ name = "domain_name", type = "string", nullable = true },
{ name = "request_method",type = "string", nullable = true },
{ name = "protocol_version", type = "string", nullable = true },
{ name = "request_uri", type = "string", nullable = true },
{ name = "request_url", type = "string", nullable = true },
{ name = "referer", type = "string", nullable = true },
{ name = "user_agent", type = "string", nullable = true },
{ name = "request_header",type = "string", nullable = true },
{ name = "request_body", type = "string", nullable = true },
{ name = "response_code", type = "string", nullable = true },
{ name = "response_header",type = "string", nullable = true },
{ name = "response_body", type = "string", nullable = true },
{ name = "XFF", type = "string", nullable = true },
{ name = "extra_info", type = "string", nullable = true },
{ name = "s_country", type = "string", nullable = true },
{ name = "s_province", type = "string", nullable = true },
{ name = "s_city", type = "string", nullable = true },
{ name = "s_operator", type = "string", nullable = true },
{ name = "d_country", type = "string", nullable = true },
{ name = "d_province", type = "string", nullable = true },
{ name = "d_city", type = "string", nullable = true },
{ name = "d_operator", type = "string", nullable = true },
{ name = "payload", type = "string", nullable = true },
{ name = "attack_result", type = "int", nullable = true },
{ name = "attack_chain", type = "int", nullable = true },
{ name = "description", type = "string", nullable = true },
{ name = "solution", type = "string", nullable = true },
{ name = "original_info", type = "string", nullable = true },
{ name = "pcap_info", type = "string", nullable = true }
]
}
}
}
sink {
Console {
plugin_input = "kafka_table"
}
LocalFile {
plugin_input = "kafka_table"
path = "/data/seatunnel/file"
file_format_type = "json"
custom_filename = true
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
file_name_expression = "${transactionId}_${now}"
}
}`
`2026-01-27 18:01:20,957 INFO [o.a.s.e.s.d.p.PhysicalVertex ]
[hz.main.seaTunnel.task.thread-5] - Job (1068470671115288577), Pipeline:
[(1/1)], task: [pipeline-1 [Source[0]-Kafka]-SourceTask (1/1)],
taskGroupLocation: [TaskGroupLocation{jobId=1068470671115288577, pipelineId=1,
taskGroupId=3}] state process is stopped
2026-01-27 18:01:20,957 ERROR [o.a.s.e.s.d.p.PhysicalVertex ]
[hz.main.seaTunnel.task.thread-5] - Job (1068470671115288577), Pipeline:
[(1/1)], task: [pipeline-1 [Source[0]-Kafka]-SourceTask (1/1)],
taskGroupLocation: [TaskGroupLocation{jobId=1068470671115288577, pipelineId=1,
taskGroupId=3}] end with state FAILED and Exception:
java.lang.ArrayIndexOutOfBoundsException: 12
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at
org.apache.seatunnel.format.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:51)
at
org.apache.seatunnel.format.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:34)
at
org.apache.seatunnel.api.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:38)
at
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:72)
at
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:37)
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
at
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
at
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)`
Flink tasks compile the schema from event.avro files into Java classes, then
parse the data using the AvroDeserializationSchema serializer.
<img width="1616" height="990" alt="Image"
src="https://github.com/user-attachments/assets/afe84eba-fd6f-4141-8ca2-832850c1c3cd"
/>
<img width="1246" height="903" alt="Image"
src="https://github.com/user-attachments/assets/30848d09-15da-4929-b2ec-256b0ca6be48"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]