li3zhi4 opened a new issue, #10061: URL: https://github.com/apache/seatunnel/issues/10061
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 我使用的kafka版本为3.9.1 if (producer.getEpoch() == 0) { break; } 这段代码没有按预期进入这里,导致机器卡死,合理怀疑为TransactionId没有正确更新。 我目前的修改为 @Override public void abortTransaction(long checkpointId) { KafkaInternalProducer<K, V> producer; for (long i = checkpointId; ; i++) { if (this.kafkaProducer != null) { producer = this.kafkaProducer; } else { producer = getTransactionProducer(this.kafkaProperties, generateTransactionId(this.transactionPrefix, i)); } // producer.setTransactionalId(transactionId); if (log.isDebugEnabled()) { log.debug("Abort kafka transaction: {}", transactionId); } // producer.flush(); if (producer.getEpoch() == 0) { break; } } } ### SeaTunnel Version 2.3.12 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 60000 checkpoint.timeout = 600000 } source { MySQL-CDC { plugin_output = "s" url = "jdbc:mysql://xxx:3306/testdb" username = "xxx" password = "xxx" database-names = ["testdb"] table-names = ["testdb.xxtable"] int_type_narrowing = false server-id = "25600-25603" startup.mode = "latest" } } transform { Metadata { plugin_input = ["s"] plugin_output = "ss" metadata_fields { EventTime = __event_time Delay = __delay } } TableFilter { plugin_input = ["ss"] plugin_output = "xxtable" database_pattern = "testdb" table_pattern = "xxtable" } } sink { Kafka { plugin_input = ["xxtable"] bootstrap.servers = "xxx:9092" semantics = EXACTLY_ONCE kafka.config = { acks = "all" enable.idempotence = true retries = 3 sasl.jaas.config = "org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";" sasl.mechanism = SCRAM-SHA-512 security.protocol = SASL_PLAINTEXT compression.type = "snappy" } topic = "xxtable" } } ``` ### Running Command ```shell bin/seatunnel.sh -c task/xxtable.conf -n xxtable --async ``` ### Error Exception ```log Abort kafka transaction: SeaTunnel5724-1 ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
