This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.14.x by this push:
new 20764a2811c1 CAMEL-22534: JPA works with parallel splitter by default
(#19538)
20764a2811c1 is described below
commit 20764a2811c11ee587fb23fbced9369cf0cd4d82
Author: JiriOndrusek <[email protected]>
AuthorDate: Tue Oct 14 07:38:17 2025 +0200
CAMEL-22534: JPA works with parallel splitter by default (#19538)
---
.../apache/camel/component/jpa/JpaConstants.java | 3 +-
.../processor/jpa/JpaParallelSplitterTest.java | 88 ++++++++++++++++++++++
.../org/apache/camel/ExchangeConstantProvider.java | 3 +-
.../src/main/java/org/apache/camel/Exchange.java | 2 +
.../java/org/apache/camel/processor/Splitter.java | 6 ++
5 files changed, 100 insertions(+), 2 deletions(-)
diff --git
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
index ad0c761777e9..4273b1394f46 100644
---
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
+++
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConstants.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.jpa;
+import org.apache.camel.Exchange;
import org.apache.camel.spi.Metadata;
/**
@@ -24,7 +25,7 @@ import org.apache.camel.spi.Metadata;
public final class JpaConstants {
@Metadata(description = "The JPA `EntityManager` object.", javaType =
"jakarta.persistence.EntityManager")
- public static final String ENTITY_MANAGER = "CamelEntityManager";
+ public static final String ENTITY_MANAGER = Exchange.JPA_ENTITY_MANAGER;
@Metadata(label = "producer", description = "Alternative way for passing
query parameters as an Exchange header.",
javaType = "Map<String, Object>")
public static final String JPA_PARAMETERS_HEADER = "CamelJpaParameters";
diff --git
a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
new file mode 100644
index 000000000000..ca6ac47119ca
--- /dev/null
+++
b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaParallelSplitterTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.examples.SendEmail;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JpaParallelSplitterTest extends AbstractJpaTest {
+ protected static final String SELECT_ALL_STRING = "select x from " +
SendEmail.class.getName() + " x";
+
+ @Test
+ public void testParallelSplitter() throws Exception {
+ Future<Object> future = template.asyncRequestBody("direct:splitter",
"[email protected]");
+
+ future.get(10, TimeUnit.SECONDS);
+
+ assertCorrectEmails();
+ }
+
+ private void assertCorrectEmails() {
+ List<?> results = entityManager
+ .createQuery("select e from " + SendEmail.class.getName() + "
e WHERE e.address = '[email protected]'")
+ .getResultList();
+ assertEquals(50, results.size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("direct:splitter?timeout=1000")
+ .loop(50)
+ .setHeader("loopIndex",
exchangeProperty(Exchange.LOOP_INDEX))
+ .process(ex -> {
+ int index = ex.getIn().getHeader("loopIndex",
Integer.class);
+ SendEmail se = new SendEmail(index + "@wrong.org");
+ ex.getIn().setBody(se);
+ })
+ .to("jpa://" + SendEmail.class.getName())
+ .end()
+ //select all
+ .to("jpa://" + SendEmail.class.getName() +
"?query=SELECT e FROM SendEmail e")
+ .split(body())
+ .parallelProcessing()
+ .threads(10, 10)
+ .process(ex -> {
+ SendEmail se = ex.getIn().getBody(SendEmail.class);
+ se.setAddress("[email protected]");
+ })
+ .to("jpa://" + SendEmail.class.getName());
+
+ }
+ };
+ }
+
+ @Override
+ protected String routeXml() {
+ return "org/apache/camel/processor/jpa/springJpaRouteTest.xml";
+ }
+
+ @Override
+ protected String selectAllString() {
+ return SELECT_ALL_STRING;
+ }
+}
diff --git
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index a9e14a38e2bd..2af6074c9445 100644
---
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -13,7 +13,7 @@ public class ExchangeConstantProvider {
private static final Map<String, String> MAP;
static {
- Map<String, String> map = new HashMap<>(163);
+ Map<String, String> map = new HashMap<>(164);
map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
map.put("AGGREGATED_COLLECTION_GUARD",
"CamelAggregatedCollectionGuard");
map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -104,6 +104,7 @@ public class ExchangeConstantProvider {
map.put("INTERCEPTED_ROUTE_ENDPOINT_URI",
"CamelInterceptedParentEndpointUri");
map.put("INTERCEPTED_ROUTE_ID", "CamelInterceptedRouteId");
map.put("INTERRUPTED", "CamelInterrupted");
+ map.put("JPA_ENTITY_MANAGER", "CamelEntityManager");
map.put("LANGUAGE_SCRIPT", "CamelLanguageScript");
map.put("LOG_DEBUG_BODY_MAX_CHARS", "CamelLogDebugBodyMaxChars");
map.put("LOG_DEBUG_BODY_STREAMS", "CamelLogDebugStreams");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index abc96cfc2a09..767b17a50124 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -218,6 +218,8 @@ public interface Exchange extends VariableAware {
@Deprecated(since = "3.1.0")
String INTERRUPTED = "CamelInterrupted";
+ String JPA_ENTITY_MANAGER = "CamelEntityManager";
+
String LANGUAGE_SCRIPT = "CamelLanguageScript";
String LOG_DEBUG_BODY_MAX_CHARS = "CamelLogDebugBodyMaxChars";
String LOG_DEBUG_BODY_STREAMS = "CamelLogDebugStreams";
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index c411183f9d4d..04bec6aed8a0 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -320,6 +320,12 @@ public class Splitter extends MulticastProcessor
implements AsyncProcessor, Trac
// we do not want to copy the message history for split
sub-messages
answer.removeProperty(ExchangePropertyKey.MESSAGE_HISTORY);
}
+
+ if (isParallelProcessing()) {
+ //we do not want to copy JPA entityManager (which is not meant for
concurrent use) in parallel mode
+ //jpa component takes care of the entityManager if property is
removed
+ answer.removeProperty(Exchange.JPA_ENTITY_MANAGER);
+ }
return answer;
}
}