li3zhi4 opened a new issue, #10061:
URL: https://github.com/apache/seatunnel/issues/10061

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   我使用的kafka版本为3.9.1
               if (producer.getEpoch() == 0) {
                   break;
               }
   这段代码没有按预期进入这里,导致机器卡死,合理怀疑为TransactionId没有正确更新。
   我目前的修改为
       @Override
       public void abortTransaction(long checkpointId) {
   
           KafkaInternalProducer<K, V> producer;
   
           for (long i = checkpointId; ; i++) {
               if (this.kafkaProducer != null) {
                   producer = this.kafkaProducer;
               } else {
                   producer = getTransactionProducer(this.kafkaProperties, 
generateTransactionId(this.transactionPrefix, i));
               }
   //            producer.setTransactionalId(transactionId);
               if (log.isDebugEnabled()) {
                   log.debug("Abort kafka transaction: {}", transactionId);
               }
   //            producer.flush();
               if (producer.getEpoch() == 0) {
                   break;
               }
           }
       }
   
   ### SeaTunnel Version
   
   2.3.12
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 60000
     checkpoint.timeout = 600000
   }
   source {
     MySQL-CDC {
       plugin_output = "s"
       url = "jdbc:mysql://xxx:3306/testdb"
       username = "xxx"
       password = "xxx"
       database-names = ["testdb"]
       table-names = ["testdb.xxtable"]
       int_type_narrowing = false
       server-id = "25600-25603" 
       startup.mode = "latest"
     }
   }
   transform {
     Metadata {
       plugin_input = ["s"]
       plugin_output = "ss"
       metadata_fields {
         EventTime = __event_time
         Delay = __delay
       }
     }
     TableFilter {
       plugin_input = ["ss"]
       plugin_output = "xxtable"
       database_pattern = "testdb"
       table_pattern = "xxtable"
     }
   }
   sink {
     Kafka {
       plugin_input = ["xxtable"]
       bootstrap.servers = "xxx:9092"
       semantics = EXACTLY_ONCE
       kafka.config = {
         acks = "all"
         enable.idempotence = true
         retries = 3
         sasl.jaas.config = 
"org.apache.kafka.common.security.scram.ScramLoginModule required  
username="xxx"  password="xxx";"
         sasl.mechanism = SCRAM-SHA-512
         security.protocol = SASL_PLAINTEXT
         compression.type = "snappy"
       }
       topic = "xxtable"
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh -c task/xxtable.conf -n xxtable --async
   ```
   
   ### Error Exception
   
   ```log
   Abort kafka transaction: SeaTunnel5724-1
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to