[ 
https://issues.apache.org/activemq/browse/CAMEL-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-3337:
-------------------------------

    Description: 
My app is having memory issues due to the use of an aggregator. My app is 
fairly straightforward. It reads from a CSV using streaming, tokenizes it by 
line, passes the result to a processor, aggregates the result, then puts this 
on a JMS queue. Here is the route definition (for Spring):

{code:xml}
<route>
  <from "file:in" />
  <split streaming="true">
    <tokenize token="\n" regex="false" />
    <unmarshal><csv /></unmarshal>
    <bean ref="myBean" method="translate" />
    <aggregate strategyRef="myAggregationStrategy">
    <correlationExpression><constant>true</constant></correlationExpression>
    <completionTimeout><simple>1000</simple></completionTimeout>
    <completionSize><simple>100</simple></completionSize>
    <to uri="activemq:queue:myQueue"/>                  
  </aggregate>
  </split>
</route>
{code}

The problem happens when the consumer of "myQueue" is not as fast as the file 
reading and parsing. With a slow consumer, ActiveMQ will eventually throttle 
the producer so the message queue doesn't use up all memory and/or disk space.

As messages are passed to the aggregator, it internally submits jobs to an 
executor which will then put the message in the queue. This executor uses an 
unbounded queue. If the message producer has been throttled, then the process 
jobs cannot queue the messages quickly enough, and the executor's queue will 
continue to back up until all memory is used.

As a workaround, I am thinking I could implement my own executor service which 
is synchronous or at least blocks when the queue size reaches a certain size. I 
haven't yet firgured out how to configure this however.



  was:
My app is having memory issues due to the use of an aggregator. My app is 
fairly straightforward. It reads from a CSV using streaming, tokenizes it by 
line, passes the result to a processor, aggregates the result, then puts this 
on a JMS queue. Here is the route definition (for Spring):

[code]
<route>
  <from "file:in" />
  <split streaming="true">
    <tokenize token="\n" regex="false" />
    <unmarshal><csv /></unmarshal>
    <bean ref="myBean" method="translate" />
    <aggregate strategyRef="myAggregationStrategy">
    <correlationExpression><constant>true</constant></correlationExpression>
    <completionTimeout><simple>1000</simple></completionTimeout>
    <completionSize><simple>100</simple></completionSize>
    <to uri="activemq:queue:myQueue"/>                  
  </aggregate>
  </split>
</route>
[/code]

The problem happens when the consumer of "myQueue" is not as fast as the file 
reading and parsing. With a slow consumer, ActiveMQ will eventually throttle 
the producer so the message queue doesn't use up all memory and/or disk space.

As messages are passed to the aggregator, it internally submits jobs to an 
executor which will then put the message in the queue. This executor uses an 
unbounded queue. If the message producer has been throttled, then the process 
jobs cannot queue the messages quickly enough, and the executor's queue will 
continue to back up until all memory is used.

As a workaround, I am thinking I could implement my own executor service which 
is synchronous or at least blocks when the queue size reaches a certain size. I 
haven't yet firgured out how to configure this however.



       Priority: Minor  (was: Major)
     Issue Type: Improvement  (was: Bug)

Not a bug as its how its designed. Its after all memory based by default. 

But we should look into offering an option to limit the task queue.


> Aggregator using an unbounded queue, can use up all heap
> --------------------------------------------------------
>
>                 Key: CAMEL-3337
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-3337
>             Project: Apache Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.5.0
>         Environment: JDK 1.6.0_22 (Windows 7 64-bit and OS X 10.6.5)
>            Reporter: Bryan Keller
>            Priority: Minor
>
> My app is having memory issues due to the use of an aggregator. My app is 
> fairly straightforward. It reads from a CSV using streaming, tokenizes it by 
> line, passes the result to a processor, aggregates the result, then puts this 
> on a JMS queue. Here is the route definition (for Spring):
> {code:xml}
> <route>
>   <from "file:in" />
>   <split streaming="true">
>     <tokenize token="\n" regex="false" />
>     <unmarshal><csv /></unmarshal>
>     <bean ref="myBean" method="translate" />
>     <aggregate strategyRef="myAggregationStrategy">
>     <correlationExpression><constant>true</constant></correlationExpression>
>     <completionTimeout><simple>1000</simple></completionTimeout>
>     <completionSize><simple>100</simple></completionSize>
>     <to uri="activemq:queue:myQueue"/>                
>   </aggregate>
>   </split>
> </route>
> {code}
> The problem happens when the consumer of "myQueue" is not as fast as the file 
> reading and parsing. With a slow consumer, ActiveMQ will eventually throttle 
> the producer so the message queue doesn't use up all memory and/or disk space.
> As messages are passed to the aggregator, it internally submits jobs to an 
> executor which will then put the message in the queue. This executor uses an 
> unbounded queue. If the message producer has been throttled, then the process 
> jobs cannot queue the messages quickly enough, and the executor's queue will 
> continue to back up until all memory is used.
> As a workaround, I am thinking I could implement my own executor service 
> which is synchronous or at least blocks when the queue size reaches a certain 
> size. I haven't yet firgured out how to configure this however.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to