[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908423#comment-16908423 ]
michael elbaz commented on CAMEL-12906: --------------------------------------- There is someone have any idea about ? because it cause losing of messages > Strange comportement with aggregator > ------------------------------------ > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq > Affects Versions: 2.22.1 > Reporter: michael elbaz > Priority: Minor > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > // errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)