[
https://issues.apache.org/jira/browse/CAMEL-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andre Weickel updated CAMEL-20214:
----------------------------------
Description:
After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue
in all newer Camel 3 versions:
A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize =
1000.
If the route is called 1001 times within the configured splitter timeout one
message failes with "java.util.concurrent.RejectedExecutionException: Task
rejected due queue size limit reached" which is thrown by the
SizedScheduledExecutorService class.
For each call of the route one "timeout" task is added to the DelayedWorkQueue
which is used by the "Splitter- AggregateTask" thread. Each of this "timeout"
tasks wait until its timeout is reached although the message processing is
already completed. That means the 1000 messages are already processed
sucessfully but the 1000 "timeout" tasks are still in the DelayedWorkQueue and
block further message processing (until the timeout is reached) because the
queue is full.
We found some comments in the MulticastProcessor.doStart() method that an
unbounded thread pool has to be used for the "Splitter-AggregateTask" thread to
avoid issues.
{code:java}
/* use unbounded thread pool so we ensure the aggregate on-the-fly task always
will have assigned a thread and run the tasks when the task is submitted. If
not then the aggregate task may not be able to run and signal completion during
processing, which would lead to what would appear as a dead-lock or a slow
processing */{code}
Therefore we assume the maxQueueSize should also be unlimited when the thread
pool is created in MulticastProcessor.createAggregateExcutorService().
A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to
reproduce the issue with only two calls):
{code:java}
public class SplitterTest extends CamelTestSupport { String payload1 =
"<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
String payload2 =
"<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
@Test
public void testSplitter() throws InterruptedException, IOException {
MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
mockEndpoint.expectedMessageCount(4);
template.sendBody("direct:start", payload1);
template.sendBody("direct:start", payload2);
assertMockEndpointsSatisfied();
} @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
ThreadPoolProfile myThreadPoolProfile = new
ThreadPoolProfile("testProfile");
myThreadPoolProfile.setMaxPoolSize(20);
myThreadPoolProfile.setPoolSize(10);
myThreadPoolProfile.setMaxQueueSize(1);
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
from("direct:start")
.split()
.xpath("//items/item")
.parallelProcessing(true)
.streaming(true)
.stopOnException(true)
.timeout("300000")
.executorServiceRef("testProfile")
.to("mock:split");
}
};
}
}{code}
was:
After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue
in all newer Camel 3 versions:
A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize =
1000.
If the route is called 1001 times within the configured splitter timeout one
message failes with "java.util.concurrent.RejectedExecutionException: Task
rejected due queue size limit reached" which is thrown by the
SizedScheduledExecutorService class.
For each call of the route one "timeout" task is added to the DelayedWorkQueue
which is used by the "Splitter- AggregateTask" thread. Each of this "timeout"
tasks wait until its timeout is reached although the message processing is
already completed. That means the 1000 messages are already processed
sucessfully but the 1000 "timeout" tasks are still in the DelayedWorkQueue and
block further message processing because the queue is full.
We found some comments in the MulticastProcessor.doStart() method that an
unbounded thread pool has to be used for the "Splitter-AggregateTask" thread to
avoid issues.
{code:java}
/* use unbounded thread pool so we ensure the aggregate on-the-fly task always
will have assigned a thread and run the tasks when the task is submitted. If
not then the aggregate task may not be able to run and signal completion during
processing, which would lead to what would appear as a dead-lock or a slow
processing */{code}
Therefore we assume the maxQueueSize should also be unlimited when the thread
pool is created in MulticastProcessor.createAggregateExcutorService().
A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to
reproduce the issue with only two calls):
{code:java}
public class SplitterTest extends CamelTestSupport { String payload1 =
"<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
String payload2 =
"<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
@Test
public void testSplitter() throws InterruptedException, IOException {
MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
mockEndpoint.expectedMessageCount(4);
template.sendBody("direct:start", payload1);
template.sendBody("direct:start", payload2);
assertMockEndpointsSatisfied();
} @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
ThreadPoolProfile myThreadPoolProfile = new
ThreadPoolProfile("testProfile");
myThreadPoolProfile.setMaxPoolSize(20);
myThreadPoolProfile.setPoolSize(10);
myThreadPoolProfile.setMaxQueueSize(1);
getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
from("direct:start")
.split()
.xpath("//items/item")
.parallelProcessing(true)
.streaming(true)
.stopOnException(true)
.timeout("300000")
.executorServiceRef("testProfile")
.to("mock:split");
}
};
}
}{code}
> "Timeout" tasks of parallel splitter block further message processing
> ---------------------------------------------------------------------
>
> Key: CAMEL-20214
> URL: https://issues.apache.org/jira/browse/CAMEL-20214
> Project: Camel
> Issue Type: Bug
> Components: came-core
> Affects Versions: 3.14.10, 3.21.2
> Reporter: Andre Weickel
> Priority: Minor
>
> After an update from Camel 2.x to Camel 3.14.7 we noticed the following issue
> in all newer Camel 3 versions:
> A parallel splitter uses per default a ThreadPoolProfile with maxQueueSize =
> 1000.
> If the route is called 1001 times within the configured splitter timeout one
> message failes with "java.util.concurrent.RejectedExecutionException: Task
> rejected due queue size limit reached" which is thrown by the
> SizedScheduledExecutorService class.
> For each call of the route one "timeout" task is added to the
> DelayedWorkQueue which is used by the "Splitter- AggregateTask" thread. Each
> of this "timeout" tasks wait until its timeout is reached although the
> message processing is already completed. That means the 1000 messages are
> already processed sucessfully but the 1000 "timeout" tasks are still in the
> DelayedWorkQueue and block further message processing (until the timeout is
> reached) because the queue is full.
> We found some comments in the MulticastProcessor.doStart() method that an
> unbounded thread pool has to be used for the "Splitter-AggregateTask" thread
> to avoid issues.
> {code:java}
> /* use unbounded thread pool so we ensure the aggregate on-the-fly task
> always will have assigned a thread and run the tasks when the task is
> submitted. If not then the aggregate task may not be able to run and signal
> completion during processing, which would lead to what would appear as a
> dead-lock or a slow processing */{code}
> Therefore we assume the maxQueueSize should also be unlimited when the thread
> pool is created in MulticastProcessor.createAggregateExcutorService().
> A short test to reproduce the mentioned issue (maxQueueSize is set to 1 to
> reproduce the issue with only two calls):
>
> {code:java}
> public class SplitterTest extends CamelTestSupport { String payload1 =
> "<items><item><id>1</id><name>one</name></item><item><id>2</id><name>two</name></item></items>";
> String payload2 =
> "<items><item><id>3</id><name>three</name></item><item><id>4</id><name>four</name></item></items>";
>
> @Test
> public void testSplitter() throws InterruptedException, IOException {
> MockEndpoint mockEndpoint = getMockEndpoint("mock:split");
> mockEndpoint.expectedMessageCount(4);
> template.sendBody("direct:start", payload1);
> template.sendBody("direct:start", payload2);
> assertMockEndpointsSatisfied();
> } @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> ThreadPoolProfile myThreadPoolProfile = new
> ThreadPoolProfile("testProfile");
> myThreadPoolProfile.setMaxPoolSize(20);
> myThreadPoolProfile.setPoolSize(10);
> myThreadPoolProfile.setMaxQueueSize(1);
>
> getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
> from("direct:start")
> .split()
> .xpath("//items/item")
> .parallelProcessing(true)
> .streaming(true)
> .stopOnException(true)
> .timeout("300000")
> .executorServiceRef("testProfile")
> .to("mock:split");
> }
> };
> }
>
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)