[ 
https://issues.apache.org/jira/browse/CAMEL-3535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12981462#action_12981462
 ] 

Brian Feaver commented on CAMEL-3535:
-------------------------------------

Even in your output the line I'm expecting is missing. Hope this clears up what 
I'm seeing. Here's the output when it works

{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval 
triggered for correlation key: true
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG 
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete 
for correlation key true sending aggregated exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG 
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated 
exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange 
onComplete: Exchange[Message: AB]
[main] INFO test.AggregationTest - Testing done: test.AggregationTest@77546dbc
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated 
exchange: Exchange[Message: AB] complete.
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval 
task complete
{noformat}

Here's the output when it doesn't

{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval 
triggered for correlation key: true
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG 
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete 
for correlation key true sending aggregated exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG 
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated 
exchange: Exchange[Message: AB]
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated 
exchange: Exchange[Message: AB] complete.
[main] INFO test.AggregationTest - Testing done: test.AggregationTest@614a75bb
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval 
task complete
{noformat}

The difference between the two is the following line:

{noformat}
[Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE 
org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange 
onComplete: Exchange[Message: AB]
{noformat}

This would be written to the log in 
AggregateProcesor$AggregateOnCompletion.onComplete() on or around line 544 in 
AggregateProcessor.

{code}        public void onComplete(Exchange exchange) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Aggregated exchange onComplete: " + exchange);
            }

            // only confirm if we processed without a problem
            try {
                aggregationRepository.confirm(exchange.getContext(), 
exchangeId);
                // and remove redelivery state as well
                redeliveryState.remove(exchangeId);
            } finally {
                // must remember to remove in progress when we are complete
                inProgressCompleteExchanges.remove(exchangeId);
            }
        }
{code}

The lack of the inProgressCompleteExchanges.remove(exchangeId); call is 
precisely what's causing inProgressCompleteExchanges to grow larger and not get 
cleaned up.

> Aggregation fails to call onComplete for exchanges if the aggregation is 
> after a bean or process.
> -------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-3535
>                 URL: https://issues.apache.org/jira/browse/CAMEL-3535
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.5.0
>            Reporter: Brian Feaver
>            Assignee: Claus Ibsen
>             Fix For: 2.6.0
>
>         Attachments: AggregationTest.java
>
>
> When creating a route that contains an aggregation, if that aggregation is 
> preceded by a bean or process, it will fail to call 
> AggregateOnCompletion.onComplete(). I've attached a unit test that can show 
> you the behavior. Trace level loggging will need to be enabled to see the 
> difference. With the call to the bean, it won't show the following log entry:
> {noformat}TRACE org.apache.camel.processor.aggregate.AggregateProcessor - 
> Aggregated exchange onComplete: Exchange[Message: ab]{noformat}
> If you remove the bean call, it'll start calling onComplete() again.
> What I've noticed is that if this call is not made, it ends up in a memory 
> leak since the inProgressCompleteExchanges HashSet in AggregateProcessor 
> never has any exchange ID's removed.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to