This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push: new f896dd8058f CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950) (#10953) f896dd8058f is described below commit f896dd8058fe1c821bb2586a0f8761d07ca6aa3c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Aug 2 17:57:42 2023 +0200 CAMEL-19487: camel-bean - Fix concurrency issue in BeanInfo cache when EIPs are using an existing bean instance. (#10950) (#10953) --- .../org/apache/camel/component/bean/BeanInfo.java | 11 +- .../camel/component/bean/BeanInfoCacheKey.java | 15 ++- .../camel/component/bean/ConstantBeanHolder.java | 2 +- .../bean/DefaultBeanProcessorFactory.java | 3 +- .../DynamicRouterConcurrentEIPManualTest.java | 128 +++++++++++++++++++++ .../DynamicRouterConcurrentPOJOManualTest.java | 127 ++++++++++++++++++++ .../DynamicRouterConcurrentPOJOTest.java | 82 ------------- 7 files changed, 274 insertions(+), 94 deletions(-) diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java index 24e2e9c6ce5..f53439b6b75 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfo.java @@ -73,6 +73,7 @@ public class BeanInfo { private final CamelContext camelContext; private final BeanComponent component; private final Class<?> type; + private final Object instance; private final ParameterMappingStrategy strategy; private final MethodInfo defaultMethod; // shared state with details of operations introspected from the bean, created during the constructor @@ -92,22 +93,24 @@ public class BeanInfo { public BeanInfo(CamelContext camelContext, Method explicitMethod, ParameterMappingStrategy parameterMappingStrategy, BeanComponent beanComponent) { - this(camelContext, explicitMethod.getDeclaringClass(), explicitMethod, parameterMappingStrategy, beanComponent); + this(camelContext, explicitMethod.getDeclaringClass(), null, explicitMethod, parameterMappingStrategy, beanComponent); } public BeanInfo(CamelContext camelContext, Class<?> type, ParameterMappingStrategy strategy, BeanComponent beanComponent) { - this(camelContext, type, null, strategy, beanComponent); + this(camelContext, type, null, null, strategy, beanComponent); } - public BeanInfo(CamelContext camelContext, Class<?> type, Method explicitMethod, ParameterMappingStrategy strategy, + public BeanInfo(CamelContext camelContext, Class<?> type, Object instance, Method explicitMethod, + ParameterMappingStrategy strategy, BeanComponent beanComponent) { this.camelContext = camelContext; this.type = type; + this.instance = instance; this.strategy = strategy; this.component = beanComponent; - final BeanInfoCacheKey key = new BeanInfoCacheKey(type, explicitMethod); + final BeanInfoCacheKey key = new BeanInfoCacheKey(type, instance, explicitMethod); // lookup if we have a bean info cache BeanInfo beanInfo = component.getBeanInfoFromCache(key); diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java index f29438189ff..3edabbdfb05 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/BeanInfoCacheKey.java @@ -17,6 +17,7 @@ package org.apache.camel.component.bean; import java.lang.reflect.Method; +import java.util.Objects; /** * A key used for caching {@link BeanInfo} by the {@link BeanComponent} @@ -24,10 +25,12 @@ import java.lang.reflect.Method; public final class BeanInfoCacheKey { private final Class<?> type; + private final Object instance; private final Method explicitMethod; - public BeanInfoCacheKey(Class<?> type, Method explicitMethod) { + public BeanInfoCacheKey(Class<?> type, Object instance, Method explicitMethod) { this.type = type; + this.instance = instance; this.explicitMethod = explicitMethod; } @@ -42,19 +45,19 @@ public final class BeanInfoCacheKey { BeanInfoCacheKey that = (BeanInfoCacheKey) o; - if (explicitMethod != null ? !explicitMethod.equals(that.explicitMethod) : that.explicitMethod != null) { + if (!Objects.equals(type, that.type)) { return false; } - if (!type.equals(that.type)) { + if (!Objects.equals(instance, that.instance)) { return false; } - - return true; + return Objects.equals(explicitMethod, that.explicitMethod); } @Override public int hashCode() { - int result = type.hashCode(); + int result = type != null ? type.hashCode() : 0; + result = 31 * result + (instance != null ? instance.hashCode() : 0); result = 31 * result + (explicitMethod != null ? explicitMethod.hashCode() : 0); return result; } diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java index 31b00171a17..dd9fd2ec615 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/ConstantBeanHolder.java @@ -47,7 +47,7 @@ public class ConstantBeanHolder implements BeanHolder { ObjectHelper.notNull(bean, "bean"); this.bean = bean; - this.beanInfo = new BeanInfo(context, bean.getClass(), parameterMappingStrategy, beanComponent); + this.beanInfo = new BeanInfo(context, bean.getClass(), bean, null, parameterMappingStrategy, beanComponent); } @Override diff --git a/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java b/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java index e3affca6948..9ff2ee574b6 100644 --- a/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java +++ b/components/camel-bean/src/main/java/org/apache/camel/component/bean/DefaultBeanProcessorFactory.java @@ -58,7 +58,8 @@ public final class DefaultBeanProcessorFactory extends ServiceSupport @Override public Processor createBeanProcessor(CamelContext camelContext, Object bean, Method method) throws Exception { - BeanInfo info = new BeanInfo(camelContext, method, parameterMappingStrategy, beanComponent); + BeanInfo info + = new BeanInfo(camelContext, method.getDeclaringClass(), bean, method, parameterMappingStrategy, beanComponent); return new BeanProcessor(bean, info); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java new file mode 100644 index 00000000000..653f5612283 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentEIPManualTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.dynamicrouter; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Header; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; + +/** + * {@link DynamicRouterConcurrentPOJOManualTest} + */ +@Disabled("Manual test together with DynamicRouterConcurrentPOJOTest") +public class DynamicRouterConcurrentEIPManualTest extends ContextTestSupport { + + private static final int COUNT = 100; + + @RepeatedTest(100) + public void testConcurrentDynamicRouter() throws Exception { + final MockEndpoint mockA = getMockEndpoint("mock:a"); + mockA.expectedMessageCount(COUNT); + final MockEndpoint mockB = getMockEndpoint("mock:b"); + mockB.expectedMessageCount(COUNT); + + Thread sendToSedaA = createSedaSenderThread("seda:a", context.createProducerTemplate()); + Thread sendToSedaB = createSedaSenderThread("seda:b", context.createProducerTemplate()); + + sendToSedaA.start(); + sendToSedaB.start(); + + sendToSedaA.join(10000); + sendToSedaB.join(10000); + + /* + * Awaiting the sum of the two mocks to be 200 makes demonstrating CAMEL-19487 + * a bit faster: the problem is that sometimes messages for mock:a land in mock:b or vice versa + * but the sum is always 200 + */ + Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> mockA.getReceivedCounter() + mockB.getReceivedCounter() == 200); + + /* Now that all messages were delivered, let's make sure that messages for mock:a did not land in mock:b or vice versa */ + Assertions.assertThat(mockA.getReceivedExchanges()) + .map(Exchange::getMessage) + .map(m -> m.getBody(String.class)) + .filteredOn(body -> body.contains("Message from seda:b")) + .as( + "Expected mock:a to contain only messages from seda:a, but there were also messages from seda:b") + .isEmpty(); + + Assertions.assertThat(mockB.getReceivedExchanges()) + .map(Exchange::getMessage) + .map(m -> m.getBody(String.class)) + .filteredOn(body -> body.contains("Message from seda:a")) + .as( + "Expected mock:b to contain only messages from seda:b, but there were also messages from seda:a") + .isEmpty(); + + Assertions.assertThat(mockA.getReceivedCounter()).isEqualTo(100); + Assertions.assertThat(mockB.getReceivedCounter()).isEqualTo(100); + + } + + private Thread createSedaSenderThread(final String seda, final ProducerTemplate perThreadtemplate) { + return new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < COUNT; i++) { + perThreadtemplate.sendBody(seda, "Message from " + seda + " " + i); + } + } + }); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + MyDynamicRouterPojo a = new MyDynamicRouterPojo("mock:a"); + MyDynamicRouterPojo b = new MyDynamicRouterPojo("mock:b"); + + from("seda:a") + .dynamicRouter(method(a, "route")); + + from("seda:b") + .dynamicRouter(method(b, "route")); + } + }; + } + + public static class MyDynamicRouterPojo { + + private final String target; + + public MyDynamicRouterPojo(String target) { + this.target = target; + } + + public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) { + if (previous == null) { + return target; + } else { + return null; + } + } + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java new file mode 100644 index 00000000000..a2eeeeb3f29 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOManualTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.dynamicrouter; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.DynamicRouter; +import org.apache.camel.Exchange; +import org.apache.camel.Header; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; + +/** + * {@link DynamicRouterConcurrentEIPManualTest} + */ +@Disabled("Manual test together with DynamicRouterConcurrentEIPTest") +public class DynamicRouterConcurrentPOJOManualTest extends ContextTestSupport { + + private static final int COUNT = 100; + + @RepeatedTest(100) + public void testConcurrentDynamicRouter() throws Exception { + final MockEndpoint mockA = getMockEndpoint("mock:a"); + mockA.expectedMessageCount(COUNT); + final MockEndpoint mockB = getMockEndpoint("mock:b"); + mockB.expectedMessageCount(COUNT); + + Thread sendToSedaA = createSedaSenderThread("seda:a", context.createProducerTemplate()); + Thread sendToSedaB = createSedaSenderThread("seda:b", context.createProducerTemplate()); + + sendToSedaA.start(); + sendToSedaB.start(); + + sendToSedaA.join(10000); + sendToSedaB.join(10000); + + /* + * Awaiting the sum of the two mocks to be 200 makes demonstrating CAMEL-19487 + * a bit faster: the problem is that sometimes messages for mock:a land in mock:b or vice versa + * but the sum is always 200 + */ + Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> mockA.getReceivedCounter() + mockB.getReceivedCounter() == 200); + + /* Now that all messages were delivered, let's make sure that messages for mock:a did not land in mock:b or vice versa */ + Assertions.assertThat(mockA.getReceivedExchanges()) + .map(Exchange::getMessage) + .map(m -> m.getBody(String.class)) + .filteredOn(body -> body.contains("Message from seda:b")) + .as( + "Expected mock:a to contain only messages from seda:a, but there were also messages from seda:b") + .isEmpty(); + + Assertions.assertThat(mockB.getReceivedExchanges()) + .map(Exchange::getMessage) + .map(m -> m.getBody(String.class)) + .filteredOn(body -> body.contains("Message from seda:a")) + .as( + "Expected mock:b to contain only messages from seda:b, but there were also messages from seda:a") + .isEmpty(); + + Assertions.assertThat(mockA.getReceivedCounter()).isEqualTo(100); + Assertions.assertThat(mockB.getReceivedCounter()).isEqualTo(100); + + } + + private Thread createSedaSenderThread(final String seda, final ProducerTemplate perThreadtemplate) { + return new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < COUNT; i++) { + perThreadtemplate.sendBody(seda, "Message from " + seda + " " + i); + } + } + }); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("seda:a") + .bean(new MyDynamicRouterPojo("mock:a")); + + from("seda:b") + .bean(new MyDynamicRouterPojo("mock:b")); + } + }; + } + + public static class MyDynamicRouterPojo { + + private final String target; + + public MyDynamicRouterPojo(String target) { + this.target = target; + } + + @DynamicRouter + public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) { + if (previous == null) { + return target; + } else { + return null; + } + } + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java deleted file mode 100644 index 3849a08f30c..00000000000 --- a/core/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor.dynamicrouter; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.DynamicRouter; -import org.apache.camel.Exchange; -import org.apache.camel.Header; -import org.apache.camel.builder.RouteBuilder; -import org.junit.jupiter.api.Test; - -public class DynamicRouterConcurrentPOJOTest extends ContextTestSupport { - - private static final int COUNT = 100; - - @Test - public void testConcurrentDynamicRouter() throws Exception { - getMockEndpoint("mock:a").expectedMessageCount(COUNT); - getMockEndpoint("mock:b").expectedMessageCount(COUNT); - - Thread sendToSedaA = createSedaSenderThread("seda:a"); - Thread sendToSedaB = createSedaSenderThread("seda:b"); - - sendToSedaA.start(); - sendToSedaB.start(); - - assertMockEndpointsSatisfied(); - } - - private Thread createSedaSenderThread(final String seda) { - return new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < COUNT; i++) { - template.sendBody(seda, "Message from " + seda); - } - } - }); - } - - @Override - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - from("seda:a").bean(new MyDynamicRouterPojo("mock:a")); - from("seda:b").bean(new MyDynamicRouterPojo("mock:b")); - } - }; - } - - public class MyDynamicRouterPojo { - - private final String target; - - public MyDynamicRouterPojo(String target) { - this.target = target; - } - - @DynamicRouter - public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) { - if (previous == null) { - return target; - } else { - return null; - } - } - } -}