zhfeng commented on PR #7454:
URL: https://github.com/apache/camel/pull/7454#issuecomment-1102443104
Hi @davsclaus ,
I'm working on combine the kafka transaction with the JTATransactionPolicy.
So the route looks like
```java
from("direct:transaction")
.transactd().policy(new RequiredJTATransactionPolicy())
.to("sql:ds?insertSomething) // use sql componet to insert into database
.transactd().policy(new KafkaTransactionPolicy())
.to("kafka:topic?additional-properties[transactional.id]=1234...");
```
Since kafka transaction does not support JTA spec, I was thinking to
register a ```Synchronization``` to current tx transaction and ```commit``` and
```rollback``` with the JTA TransactionManager decision. like
```java
new Synchronization() {
@Override
void beforeCompletion() {}
@Override
void void afterCompletion(int status) {
if (status == COMMITTED) {
producer.commitTransaction();
} else {
producer.abortTransaction();
}
}
```
so if there is any exception during the processing, the sql operation and
kafka transaction will be rollbacked by the transaction manager.
In JTATransactionPolicy, I'd like to put the current transaction tx into
```exchange``` something like
```java
runnable.getExchange().setProperty("transaction",
transactionManager.getTransaction());
```
And @alexkazan87 proposal
> So I think(being affected by sping-boot approach) that KafkaTransaction
should be encapsulated somehow under JtaTransanction Manager. In the end, when
somebody would like to use Kafka transactions then he can put only the JTA
Manager( ApplicationManager) leaving kafkaTransactionManager functionality
completely agnostic to the application layer. So, if we wire the Jta
ApplicationManager with the KafkaManager in the end then someone with
.transacted(JTA...) and the Kafka property (application. id) will be ok. I
think this is a really useful feature( nice to have). It is not initially easy
but we can.
So I think the route looks like
```java
from("direct:transaction")
.transacted().policy(new KafkaWrapperJTATransactionPolicy()) // I'm not
sure if it is possible to do this
.to("sql:ds")
.to("kafka:topic?additional-properties[transactional.id]=1234...");
```
I'm appreciated if you can have some inputs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]