zhaoli2333 opened a new issue, #5836: URL: https://github.com/apache/seatunnel/issues/5836
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened I started a streaming task syncing data from tidb to kafka, and set sink config semantics = EXACTLY_ONCE While the datas have bean written into kafka, the task keep restarting with the error: `org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.` ### SeaTunnel Version 2.3.3 ### SeaTunnel Config ```conf env { # You can set SeaTunnel environment configuration here execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 execution.checkpoint.interval = 5000 execution.checkpoint.data-uri = "hdfs://localhost:8020/seatunnel/checkpoint" } source { Jdbc { url = "jdbc:mysql://localhost:3306/bigdata" driver = "com.mysql.cj.jdbc.Driver" user = "user" password = "password" query = "select * from table_name" partition_column= "id" partition_num = 10 } } sink { kafka { topic = "topic_name" bootstrap.servers = "localhost:9092" format = json semantics = EXACTLY_ONCE kafka.request.timeout.ms = 60000 kafka.config = { "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism":"SCRAM-SHA-256", "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";" acks = "all" request.timeout.ms = 60000 buffer.memory = 33554432 } } } ``` ### Running Command ```shell ./bin/start-seatunnel-flink-15-connector-v2.sh --config example/v2.batch.config.tidb2kafka ``` ### Error Exception ```log org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. ``` ### Zeta or Flink or Spark Version flink 1.16 ### Java or Scala Version 1.8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
