Aiden-Rose opened a new issue, #8394:
URL: https://github.com/apache/seatunnel/issues/8394

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   In spark engine streaming mode, kafka source can read data, but cannot write 
to sink.
   
   ### SeaTunnel Version
   
   2.3.8
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "STREAMING"
     #job.mode = "BATCH"
     spark.executor.instances = 2
     spark.executor.cores = 1
     spark.executor.memory = "1g"
   }
   source {
     Kafka {
       schema = {
         fields {
           name = "string"
           email = "string"
         }
       }
       format = json
       content_field = "$.data"
       topic = "wkg_topic"
       bootstrap.servers = "xxx:6667,xxx:6667,xxx:6667"
       start_mode = "earliest"
       commit_on_checkpoint=false
       poll.timeout = 10000
       consumer.group = "seatunnel_4"
       kafka.config = {
         auto.offset.reset = "earliest"
         enable.auto.commit = "false"
         client.id = client_1
       }
       result_table_name = "fake1"
     }
   }
   sink {
     Console {
       source_table_name = "fake1"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   sh start-seatunnel-spark-3-connector-v2.sh --master local[4] --deploy-mode 
client --config kafka_hive_streaming.conf
   ```
   
   
   ### Error Exception
   
   ```log
   24/12/27 15:22:07 INFO KafkaConsumer: [Consumer 
clientId=seatunnel-consumer-0, groupId=seatunnel_3] Subscribed to partition(s): 
wkg_topic-0
   24/12/27 15:22:07 INFO SubscriptionState: [Consumer 
clientId=seatunnel-consumer-0, groupId=seatunnel_3] Seeking to EARLIEST offset 
of partition wkg_topic-0
   24/12/27 15:22:08 INFO Metadata: [Consumer clientId=seatunnel-consumer-0, 
groupId=seatunnel_3] Cluster ID: upfAhoQ2Sr28-fbKtqIEnw
   24/12/27 15:22:08 INFO SubscriptionState: [Consumer 
clientId=seatunnel-consumer-0, groupId=seatunnel_3] Resetting offset for 
partition wkg_topic-0 to position FetchPosition{offset=20, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[b28-10p41-jiyu.novalocal:6667 (id: 
1001 rack: null)], epoch=absent}}.
   24/12/27 15:22:08 INFO ConsumerCoordinator: [Consumer 
clientId=seatunnel-consumer-0, groupId=seatunnel_3] Discovered group 
coordinator b28-10p41-jiyu.novalocal:6667 (id: 2147482646 rack: null)
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":[{"name":"3","email":"3","phone":"4","job":"3"},{"name":"4","email":"4","phone":"4","job":"4"}]}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"3","email":"3","phone":"4","job":"3"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"4","email":"4","phone":"4","job":"4"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":[{"name":"1","email":"1","phone":"1","job":"1"},{"name":"2","email":"2","phone":"2","job":"2"}]}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"1","email":"1","phone":"1","job":"1"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"2","email":"2","phone":"2","job":"2"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":{"name":"1","email":"1","phone":"1","job":"1"}}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"1","email":"1","phone":"1","job":"1"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":{"name":"2","email":"2","phone":"2","job":"2"}}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"2","email":"2","phone":"2","job":"2"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":{"name":"2","email":"2","phone":"2","job":"2"}}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"2","email":"2","phone":"2","job":"2"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":{"name":"2","email":"2","phone":"2","job":"2"}}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: 
{"name":"2","email":"2","phone":"2","job":"2"}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: ============================ 
fetch record ============================
   24/12/27 15:22:08 INFO KafkaRecordEmitter: jsonValue: 
{"data":[{"name":"+.Colombia 
[Organization]","email":"[email protected]","phone":"202-662-9292","job":""}]}
   24/12/27 15:22:08 INFO KafkaRecordEmitter: data: {"name":"+.Colombia 
[Organization]","email":"[email protected]","phone":"202-662-9292","job":""}
   
   
   
   Once the data is read, it will not go downstream and cannot be written to 
the downstream.
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   3.2.2
   
   ### Java or Scala Version
   
   jdk 1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to