zhfeng commented on PR #7454:
URL: https://github.com/apache/camel/pull/7454#issuecomment-1102443104

   Hi @davsclaus ,
   
   I'm working on combine the kafka transaction with the JTATransactionPolicy. 
So the route looks like
   ```java
   from("direct:transaction")
       .transactd().policy(new RequiredJTATransactionPolicy())
       .to("sql:ds?insertSomething) // use sql componet to insert into database
           .transactd().policy(new KafkaTransactionPolicy())
           .to("kafka:topic?additional-properties[transactional.id]=1234...");
   ```
   Since kafka transaction does not support JTA spec, I was thinking to 
register a ```Synchronization``` to current tx transaction and ```commit``` and 
```rollback``` with the JTA TransactionManager decision. like
   ```java
   new Synchronization() {
   @Override
   void beforeCompletion() {}
   
   @Override
   void void afterCompletion(int status) {
       if (status == COMMITTED) {
           producer.commitTransaction();
      } else {
           producer.abortTransaction();
      }
   }
   ```
   so if there is any exception during the processing, the sql operation and 
kafka transaction will be rollbacked by the transaction manager.
   
   In JTATransactionPolicy, I'd like to put the current transaction tx into 
```exchange``` something like
   ```java
    runnable.getExchange().setProperty("transaction", 
transactionManager.getTransaction());
   ```
   
   And @alexkazan87 proposal 
   > So I think(being affected by sping-boot approach) that KafkaTransaction 
should be encapsulated somehow under JtaTransanction Manager. In the end, when 
somebody would like to use Kafka transactions then he can put only the JTA 
Manager( ApplicationManager) leaving kafkaTransactionManager functionality 
completely agnostic to the application layer. So, if we wire the Jta 
ApplicationManager with the KafkaManager in the end then someone with 
.transacted(JTA...) and the Kafka property (application. id) will be ok. I 
think this is a really useful feature( nice to have). It is not initially easy 
but we can.
   
   So I think the route looks like
   ```java
   from("direct:transaction")
       .transacted().policy(new KafkaWrapperJTATransactionPolicy()) // I'm not 
sure if it is possible to do this
       .to("sql:ds")
       .to("kafka:topic?additional-properties[transactional.id]=1234...");
   ```
   I'm appreciated if you can have some inputs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to