[
https://issues.apache.org/jira/browse/CAMEL-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-20214:
--------------------------------
Fix Version/s: 4.4.0
> "Timeout" tasks of parallel splitter block further message processing
> ---------------------------------------------------------------------
>
> Key: CAMEL-20214
> URL: https://issues.apache.org/jira/browse/CAMEL-20214
> Project: Camel
> Issue Type: Bug
> Components: came-core
> Affects Versions: 3.14.10, 3.21.2
> Reporter: Andre Weickel
> Priority: Minor
> Fix For: 4.4.0
>
>
> After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue
> in all newer Camel 3 versions:
> A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize =
> 1000.
> If the route is called 1001 times within the configured splitter timeout one
> message failes with "java.util.concurrent.RejectedExecutionException: Task
> rejected due queue size limit reached" which is thrown by the
> SizedScheduledExecutorService class.
> For each call of the route one "timeout" task is added to the
> DelayedWorkQueue which is used by the "Splitter- AggregateTask" thread. Each
> of this "timeout" tasks wait until its timeout is reached although the
> message processing is already completed. That means the 1000 messages are
> already processed sucessfully but the 1000 "timeout" tasks are still in the
> DelayedWorkQueue and block further message processing (until the timeout is
> reached) because the queue is full.
> We found some comments in the MulticastProcessor.doStart() method that an
> unbounded thread pool has to be used for the "Splitter-AggregateTask" thread
> to avoid issues.
> {code:java}
> /* use unbounded thread pool so we ensure the aggregate on-the-fly task
> always will have assigned a thread and run the tasks when the task is
> submitted. If not then the aggregate task may not be able to run and signal
> completion during processing, which would lead to what would appear as a
> dead-lock or a slow processing */{code}
> Therefore we assume the maxQueueSize should also be unlimited when the thread
> pool is created in MulticastProcessor.createAggregateExcutorService().
> A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to
> reproduce the issue with only two calls):
>
> {code:java}
> public class SplitterTest extends CamelTestSupport { String payload1 =
> "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
> String payload2 =
> "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
>
> @Test
> public void testSplitter() throws InterruptedException, IOException {
> MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
> mockEndpoint.expectedMessageCount(4);
> template.sendBody("direct:start", payload1);
> template.sendBody("direct:start", payload2);
> assertMockEndpointsSatisfied();
> } @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> ThreadPoolProfile myThreadPoolProfile = new
> ThreadPoolProfile("testProfile");
> myThreadPoolProfile.setMaxPoolSize(20);
> myThreadPoolProfile.setPoolSize(10);
> myThreadPoolProfile.setMaxQueueSize(1);
>
> getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
> from("direct:start")
> .split()
> .xpath("//items/item")
> .parallelProcessing(true)
> .streaming(true)
> .stopOnException(true)
> .timeout("300000")
> .executorServiceRef("testProfile")
> .to("mock:split");
> }
> };
> }
>
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)