linyu003 commented on issue #4247:
URL: 
https://github.com/apache/incubator-seatunnel/issues/4247#issuecomment-1492074387

   I find more problems while trying to solve this issue. 
   
   # Reproduce the Problems
   seatunnel 
   branch: dev
   revision number: b8c6bbd1e6dea6462d419d6c7adf20f04d7a2430
   add code to  seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
   ```pom
           <dependency>
               <groupId>org.apache.seatunnel</groupId>
               <artifactId>connector-kafka</artifactId>
               <version>${project.version}</version>
           </dependency>
   ```
   create file 
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_kafka.conf
   ```txt
   env {
       execution.parallelism = 1
       job.mode = "STREAMING"
       execution.checkpoint.interval = 10000
     }
    
   source {
       FakeSource {
         parallelism = 1
         result_table_name = "fake"
         split.read-interval = 1000
   #       split.num = 60
         row.num = 2
         schema = {
           fields {
             name = "string"
             age = "int"
           }
         }
       }
     }
    
   transform {}
    
    
   sink{
     kafka {
           topic = "test01"
           bootstrap.servers = "localhost:9092"
           semantics = EXACTLY_ONCE
       }
   }
   
   ```
   build module seatunnel-examples in idea and run 
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java.main()
 with first arg: /examples/fake_to_kafka.conf
   
   wait less than one minute, you can see the problem.
   
   A sample running log.
   
[issue4247.log](https://github.com/apache/incubator-seatunnel/files/11122843/issue4247.log)
   
   # Problems
   talking about KafkaTransactionSender, not KafkaNoTransactionSender.
   ## Problem 1
   line 913 in log: 2023-03-31 20:02:25,834 INFO  
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=producer-SeaTunnel5627-1, transactionalId=SeaTunnel5627-1] Transiting 
to fatal error state due to 
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
   ### reason
   where a checkpoint is triggerred,  KafkaSinkWriter#snapshotState is called, 
in which producer.close() is called before a new producer is created. 
   ( note: i found that producer.close() is new added code, in 
KafkaTransactionSender#getTransactionProducer)
   
   Producer.close() method implementation will try close gracefully, which 
aborts the current transaction. When commiter try to commit the transaction 
later, an error occurred beacause it is already aborted.
   ### solution
   call kafkaProducer.close(Duration.ZERO) instead of kafkaProducer.close()
   the former one will close immediately without close current transaction.
   
   ## Problem 2
   At the end of the log file, you can see
   ```log
   2023-03-31 20:02:57,291 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-2
   2023-03-31 20:02:57,295 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-3
   2023-03-31 20:02:57,296 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-4
   2023-03-31 20:02:57,298 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-5
   2023-03-31 20:02:57,299 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-6
   2023-03-31 20:02:57,301 DEBUG 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender - 
Abort kafka transaction: SeaTunnel5627-7
   ```
   it is a dead lock.
   ### reason
   In KafkaTransactionSender#abortTransaction(long), it try to abort all 
transactions which id >= checkpointId.
   It will not stop until epoch is 0.  However,the epoch in transaction-manager 
will not be updated even the transactionalId is set to another value, so a dead 
lock will occur when the epoch is not 0 initially.
   ### solution
   Since beginning a transaction will fence all transaction before, aborting 
all transactions is not necessary. 
   And the KafkaTransactionSender#abortTransaction(long) just call 
producer.flush() ,without calling producer.abortTransaction().
   I  remove the for-loop and call producer.abortTransaction() instead of 
producer.flush()
   
   ## Problem 3
   After fixing problem 1 and problem 2, rerun the example-application, you 
will get the problem mentioned in this issue initially by @lightzhao.
   
   As I said before:
   
   > It seems that the KafkaSinkCommitter doesnot work well if the transaction 
is empty (meaning that the trasaction has no record to commit).
   >A empty transaction is a special case, so I want to just commit the 
transaction in KafkaSinkWriter.snapshotState, instead of working hard to make 
KafkaSinkCommitter compatitive with it.
   
   I will make my pr later
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to