This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new bbbb47efd8f CAMEL-18450: camel-jpa - use transaction strategy in
JpaMessageIdRepository (#8264)
bbbb47efd8f is described below
commit bbbb47efd8f35ba300e29c0ce152cd5d97d7762d
Author: Zheng Feng <[email protected]>
AuthorDate: Sun Sep 4 19:33:29 2022 +0800
CAMEL-18450: camel-jpa - use transaction strategy in JpaMessageIdRepository
(#8264)
---
.../idempotent/jpa/JpaMessageIdRepository.java | 64 ++++++++++------------
1 file changed, 30 insertions(+), 34 deletions(-)
diff --git
a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index c1156dc4b12..bb454f094fa 100644
---
a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++
b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -30,15 +30,12 @@ import org.apache.camel.Exchange;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.jpa.DefaultTransactionStrategy;
+import org.apache.camel.component.jpa.TransactionStrategy;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.service.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.orm.jpa.JpaTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionTemplate;
import static org.apache.camel.component.jpa.JpaHelper.getTargetEntityManager;
@@ -53,19 +50,19 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
private final String processorName;
private final EntityManagerFactory entityManagerFactory;
- private final TransactionTemplate transactionTemplate;
+ private final TransactionStrategy transactionStrategy;
private boolean joinTransaction = true;
private boolean sharedEntityManager;
public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory,
String processorName) {
- this(entityManagerFactory,
createTransactionTemplate(entityManagerFactory), processorName);
+ this(entityManagerFactory,
createDefaultTransactionStrategy(entityManagerFactory), processorName);
}
- public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory,
TransactionTemplate transactionTemplate,
+ public JpaMessageIdRepository(EntityManagerFactory entityManagerFactory,
TransactionStrategy transactionStrategy,
String processorName) {
this.entityManagerFactory = entityManagerFactory;
this.processorName = processorName;
- this.transactionTemplate = transactionTemplate;
+ this.transactionStrategy= transactionStrategy;
}
public static JpaMessageIdRepository jpaMessageIdRepository(String
persistenceUnit, String processorName) {
@@ -77,11 +74,8 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
return new JpaMessageIdRepository(entityManagerFactory, processorName);
}
- private static TransactionTemplate
createTransactionTemplate(EntityManagerFactory entityManagerFactory) {
- TransactionTemplate transactionTemplate = new TransactionTemplate();
- transactionTemplate.setTransactionManager(new
JpaTransactionManager(entityManagerFactory));
-
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
- return transactionTemplate;
+ private static TransactionStrategy
createDefaultTransactionStrategy(EntityManagerFactory entityManagerFactory) {
+ return new DefaultTransactionStrategy(null, entityManagerFactory);
}
@Override
@@ -95,8 +89,9 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
final EntityManager entityManager
= getTargetEntityManager(exchange, entityManagerFactory, true,
sharedEntityManager, true);
// Run this in single transaction.
- Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
+ final Boolean[] rc = new Boolean[1];
+ transactionStrategy.executeInTransaction(new Runnable() {
+ public void run() {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}
@@ -111,9 +106,9 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
entityManager.persist(processed);
entityManager.flush();
entityManager.close();
- return Boolean.TRUE;
+ rc[0] = Boolean.TRUE;
} else {
- return Boolean.FALSE;
+ rc[0] = Boolean.FALSE;
}
} catch (Exception ex) {
LOG.error("Something went wrong trying to add message to
repository {}", ex.getMessage(), ex);
@@ -130,8 +125,8 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
}
});
- LOG.debug("add {} -> {}", messageId, rc);
- return rc;
+ LOG.debug("add {} -> {}", messageId, rc[0]);
+ return rc[0];
}
@Override
@@ -146,17 +141,18 @@ public class JpaMessageIdRepository extends
ServiceSupport implements Idempotent
= getTargetEntityManager(exchange, entityManagerFactory, true,
sharedEntityManager, true);
// Run this in single transaction.
- Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
+ final Boolean[] rc = new Boolean[1];
+ transactionStrategy.executeInTransaction(new Runnable() {
+ public void run() {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}
try {
List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
- return Boolean.FALSE;
+ rc[0] = Boolean.FALSE;
} else {
- return Boolean.TRUE;
+ rc[0] = Boolean.TRUE;
}
} catch (Exception ex) {
LOG.error("Something went wrong trying to check message in
repository {}", ex.getMessage(), ex);
@@ -173,8 +169,8 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
}
});
- LOG.debug("contains {} -> {}", messageId, rc);
- return rc;
+ LOG.debug("contains {} -> {}", messageId, rc[0]);
+ return rc[0];
}
@Override
@@ -188,21 +184,22 @@ public class JpaMessageIdRepository extends
ServiceSupport implements Idempotent
final EntityManager entityManager
= getTargetEntityManager(exchange, entityManagerFactory, true,
sharedEntityManager, true);
- Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
+ Boolean rc[] = new Boolean[1];
+ transactionStrategy.executeInTransaction(new Runnable() {
+ public void run() {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}
try {
List<?> list = query(entityManager, messageId);
if (list.isEmpty()) {
- return Boolean.FALSE;
+ rc[0] = Boolean.FALSE;
} else {
MessageProcessed processed = (MessageProcessed)
list.get(0);
entityManager.remove(processed);
entityManager.flush();
entityManager.close();
- return Boolean.TRUE;
+ rc[0] = Boolean.TRUE;
}
} catch (Exception ex) {
LOG.error("Something went wrong trying to remove message
to repository {}", ex.getMessage(), ex);
@@ -220,7 +217,7 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
});
LOG.debug("remove {}", messageId);
- return rc;
+ return rc[0];
}
@Override
@@ -239,8 +236,8 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
public void clear() {
final EntityManager entityManager = getTargetEntityManager(null,
entityManagerFactory, true, sharedEntityManager, true);
- transactionTemplate.execute(new TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
+ transactionStrategy.executeInTransaction(new Runnable() {
+ public void run() {
if (isJoinTransaction()) {
entityManager.joinTransaction();
}
@@ -255,7 +252,6 @@ public class JpaMessageIdRepository extends ServiceSupport
implements Idempotent
entityManager.flush();
entityManager.close();
}
- return Boolean.TRUE;
} catch (Exception ex) {
LOG.error("Something went wrong trying to clear the
repository {}", ex.getMessage(), ex);
throw new PersistenceException(ex);