[ 
https://issues.apache.org/activemq/browse/CAMEL-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63334#action_63334
 ] 

Bryan Keller commented on CAMEL-3337:
-------------------------------------

The actual lines of code at issue are:

AggregateProcessor.java, line 387 - job submit to the executor
AggregateDefinition.java, line 156 - creation of the executor with an unbounded 
queue
ExecutorServiceHelper.java, line 142 - call to 
Executors.newSingleThreadExecutor()

(Note that newSingleThreadExecutor() is called is when not using parallel 
processing.)

Executors.newSingleThreadExecutor(), creates an Executor that uses a single 
worker thread operating off an unbounded queue, which is the problem.

> Aggregator using an unbounded queue, can use up all heap
> --------------------------------------------------------
>
>                 Key: CAMEL-3337
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-3337
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.5.0
>         Environment: JDK 1.6.0_22 (Windows 7 64-bit and OS X 10.6.5)
>            Reporter: Bryan Keller
>
> My app is having memory issues due to the use of an aggregator. My app is 
> fairly straightforward. It reads from a CSV using streaming, tokenizes it by 
> line, passes the result to a processor, aggregates the result, then puts this 
> on a JMS queue. Here is the route definition (for Spring):
> [code]
> <route>
>   <from "file:in" />
>   <split streaming="true">
>     <tokenize token="\n" regex="false" />
>     <unmarshal><csv /></unmarshal>
>     <bean ref="myBean" method="translate" />
>     <aggregate strategyRef="myAggregationStrategy">
>     <correlationExpression><constant>true</constant></correlationExpression>
>     <completionTimeout><simple>1000</simple></completionTimeout>
>     <completionSize><simple>100</simple></completionSize>
>     <to uri="activemq:queue:myQueue"/>                
>   </aggregate>
>   </split>
> </route>
> [/code]
> The problem happens when the consumer of "myQueue" is not as fast as the file 
> reading and parsing. With a slow consumer, ActiveMQ will eventually throttle 
> the producer so the message queue doesn't use up all memory and/or disk space.
> As messages are passed to the aggregator, it internally submits jobs to an 
> executor which will then put the message in the queue. This executor uses an 
> unbounded queue. If the message producer has been throttled, then the process 
> jobs cannot queue the messages quickly enough, and the executor's queue will 
> continue to back up until all memory is used.
> As a workaround, I am thinking I could implement my own executor service 
> which is synchronous or at least blocks when the queue size reaches a certain 
> size. I haven't yet firgured out how to configure this however.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to