[ https://issues.apache.org/jira/browse/CAMEL-3535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12981462#action_12981462 ]
Brian Feaver commented on CAMEL-3535: ------------------------------------- Even in your output the line I'm expecting is missing. Hope this clears up what I'm seeing. Here's the output when it works {noformat} [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval triggered for correlation key: true [Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete for correlation key true sending aggregated exchange: Exchange[Message: AB] [Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated exchange: Exchange[Message: AB] [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange onComplete: Exchange[Message: AB] [main] INFO test.AggregationTest - Testing done: test.AggregationTest@77546dbc [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated exchange: Exchange[Message: AB] complete. [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval task complete {noformat} Here's the output when it doesn't {noformat} [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval triggered for correlation key: true [Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Aggregation complete for correlation key true sending aggregated exchange: Exchange[Message: AB] [Camel (camel-1) thread #0 - AggregateTimeoutChecker] DEBUG org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated exchange: Exchange[Message: AB] [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Processing aggregated exchange: Exchange[Message: AB] complete. [main] INFO test.AggregationTest - Testing done: test.AggregationTest@614a75bb [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Completion interval task complete {noformat} The difference between the two is the following line: {noformat} [Camel (camel-1) thread #0 - AggregateTimeoutChecker] TRACE org.apache.camel.processor.aggregate.AggregateProcessor - Aggregated exchange onComplete: Exchange[Message: AB] {noformat} This would be written to the log in AggregateProcesor$AggregateOnCompletion.onComplete() on or around line 544 in AggregateProcessor. {code} public void onComplete(Exchange exchange) { if (LOG.isTraceEnabled()) { LOG.trace("Aggregated exchange onComplete: " + exchange); } // only confirm if we processed without a problem try { aggregationRepository.confirm(exchange.getContext(), exchangeId); // and remove redelivery state as well redeliveryState.remove(exchangeId); } finally { // must remember to remove in progress when we are complete inProgressCompleteExchanges.remove(exchangeId); } } {code} The lack of the inProgressCompleteExchanges.remove(exchangeId); call is precisely what's causing inProgressCompleteExchanges to grow larger and not get cleaned up. > Aggregation fails to call onComplete for exchanges if the aggregation is > after a bean or process. > ------------------------------------------------------------------------------------------------- > > Key: CAMEL-3535 > URL: https://issues.apache.org/jira/browse/CAMEL-3535 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.5.0 > Reporter: Brian Feaver > Assignee: Claus Ibsen > Fix For: 2.6.0 > > Attachments: AggregationTest.java > > > When creating a route that contains an aggregation, if that aggregation is > preceded by a bean or process, it will fail to call > AggregateOnCompletion.onComplete(). I've attached a unit test that can show > you the behavior. Trace level loggging will need to be enabled to see the > difference. With the call to the bean, it won't show the following log entry: > {noformat}TRACE org.apache.camel.processor.aggregate.AggregateProcessor - > Aggregated exchange onComplete: Exchange[Message: ab]{noformat} > If you remove the bean call, it'll start calling onComplete() again. > What I've noticed is that if this call is not made, it ends up in a memory > leak since the inProgressCompleteExchanges HashSet in AggregateProcessor > never has any exchange ID's removed. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.