Aiden-Rose opened a new issue, #8394: URL: https://github.com/apache/seatunnel/issues/8394
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened In spark engine streaming mode, kafka source can read data, but cannot write to sink. ### SeaTunnel Version 2.3.8 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "STREAMING" #job.mode = "BATCH" spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = "1g" } source { Kafka { schema = { fields { name = "string" email = "string" } } format = json content_field = "$.data" topic = "wkg_topic" bootstrap.servers = "xxx:6667,xxx:6667,xxx:6667" start_mode = "earliest" commit_on_checkpoint=false poll.timeout = 10000 consumer.group = "seatunnel_4" kafka.config = { auto.offset.reset = "earliest" enable.auto.commit = "false" client.id = client_1 } result_table_name = "fake1" } } sink { Console { source_table_name = "fake1" } } ``` ### Running Command ```shell sh start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode client --config kafka_hive_streaming.conf ``` ### Error Exception ```log 24/12/27 15:22:07 INFO KafkaConsumer: [Consumer clientId=seatunnel-consumer-0, groupId=seatunnel_3] Subscribed to partition(s): wkg_topic-0 24/12/27 15:22:07 INFO SubscriptionState: [Consumer clientId=seatunnel-consumer-0, groupId=seatunnel_3] Seeking to EARLIEST offset of partition wkg_topic-0 24/12/27 15:22:08 INFO Metadata: [Consumer clientId=seatunnel-consumer-0, groupId=seatunnel_3] Cluster ID: upfAhoQ2Sr28-fbKtqIEnw 24/12/27 15:22:08 INFO SubscriptionState: [Consumer clientId=seatunnel-consumer-0, groupId=seatunnel_3] Resetting offset for partition wkg_topic-0 to position FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b28-10p41-jiyu.novalocal:6667 (id: 1001 rack: null)], epoch=absent}}. 24/12/27 15:22:08 INFO ConsumerCoordinator: [Consumer clientId=seatunnel-consumer-0, groupId=seatunnel_3] Discovered group coordinator b28-10p41-jiyu.novalocal:6667 (id: 2147482646 rack: null) 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":[{"name":"3","email":"3","phone":"4","job":"3"},{"name":"4","email":"4","phone":"4","job":"4"}]} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"3","email":"3","phone":"4","job":"3"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"4","email":"4","phone":"4","job":"4"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":[{"name":"1","email":"1","phone":"1","job":"1"},{"name":"2","email":"2","phone":"2","job":"2"}]} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"1","email":"1","phone":"1","job":"1"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"2","email":"2","phone":"2","job":"2"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":{"name":"1","email":"1","phone":"1","job":"1"}} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"1","email":"1","phone":"1","job":"1"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":{"name":"2","email":"2","phone":"2","job":"2"}} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"2","email":"2","phone":"2","job":"2"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":{"name":"2","email":"2","phone":"2","job":"2"}} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"2","email":"2","phone":"2","job":"2"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":{"name":"2","email":"2","phone":"2","job":"2"}} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"2","email":"2","phone":"2","job":"2"} 24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ fetch record ============================ 24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: {"data":[{"name":"+.Colombia [Organization]","email":"[email protected]","phone":"202-662-9292","job":""}]} 24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"+.Colombia [Organization]","email":"[email protected]","phone":"202-662-9292","job":""} Once the data is read, it will not go downstream and cannot be written to the downstream. ``` ### Zeta or Flink or Spark Version 3.2.2 ### Java or Scala Version jdk 1.8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
