[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

michael elbaz updated CAMEL-12906:
----------------------------------
    Description: 
/!\ Its about camel aggregator and error handling during *shutdown phase*

My case is about graceful shutdown and error handling in case of aggregator 
first one:

In this example i will get *RejectedExecutionException* during the shutdown and 
the exception is not catched by the *onException* or *errorHandler* methods as 
expected so i will just lost message.

{code:java}
@Component
public class AmqRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        errors();

        from("timer:foo?period=1000")
                .log("1")
                .transform().body(() -> "DATA " + 
RandomStringUtils.randomAlphanumeric(10))
                .convertBodyTo(String.class)
                .to(amq());

        from(amq()).to("direct:foo");

        from("direct:foo")
                .delay(3000)
                .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                .completionSize(10)
                .forceCompletionOnStop()
                .log("${body}");
    }

    public void errors() {
        onException(Exception.class)
                .useOriginalMessage()
                .to("activemq:recovery")
                .handled(true)
                .onRedelivery(exchange -> System.err.println("push to amq"));

        errorHandler(deadLetterChannel("log:dead?level=ERROR"));
    }

    private static String amq() {
        String amq = "activemq:data";

        amq += "?transacted=true";

        return amq;
    }

}
{code}

Stacktrace:

{code:java}
java.util.concurrent.RejectedExecutionException: null
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
{code}


In this one (+using doTry before aggregate+) there is no exception throwed, 
that the interesting point because i expect at least catched 
*RejectedExecutionException* and finally i *didn't lost* message in this case !

{code:java}
@Component
public class AmqRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
       // errors();

        from("timer:foo?period=1000")
                .log("1")
                .transform().body(() -> "DATA " + 
RandomStringUtils.randomAlphanumeric(10))
                .convertBodyTo(String.class)
                .to(amq());

        from(amq()).to("direct:foo");

        from("direct:foo")
                .delay(3000)
                .doTry() // Strange /!\
                .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                .completionSize(10)
                .forceCompletionOnStop()
                .log("${body}");
    }

    public void errors() {
        onException(Exception.class)
                .useOriginalMessage()
                .to("activemq:recovery")
                .handled(true)
                .onRedelivery(exchange -> System.err.println("push to amq"));

        errorHandler(deadLetterChannel("log:dead?level=ERROR"));
    }

    private static String amq() {
        String amq = "activemq:data";

        amq += "?transacted=true";

        return amq;
    }

}
{code}


  was:
/!\ Its about camel aggregator and error handling during *shutdown phase*

My case is about graceful shutdown and error handling in case of aggregator 
first one:

In this example i will get *RejectedExecutionException* during the shutdown and 
the exception is not catched by the *onexception* or *errorHandler* methods as 
expected so i will just lost message.

{code:java}
@Component
public class AmqRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        errors();

        from("timer:foo?period=1000")
                .log("1")
                .transform().body(() -> "DATA " + 
RandomStringUtils.randomAlphanumeric(10))
                .convertBodyTo(String.class)
                .to(amq());

        from(amq()).to("direct:foo");

        from("direct:foo")
                .delay(3000)
                .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                .completionSize(10)
                .forceCompletionOnStop()
                .log("${body}");
    }

    public void errors() {
        onException(Exception.class)
                .useOriginalMessage()
                .to("activemq:recovery")
                .handled(true)
                .onRedelivery(exchange -> System.err.println("push to amq"));

        errorHandler(deadLetterChannel("log:dead?level=ERROR"));
    }

    private static String amq() {
        String amq = "activemq:data";

        amq += "?transacted=true";

        return amq;
    }

}
{code}

Stacktrace:

{code:java}
java.util.concurrent.RejectedExecutionException: null
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
 ~[camel-core-2.22.1.jar:2.22.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_131]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_131]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
{code}


In this one (+using doTry before aggregate+) there is no exception throwed, 
that the interesting point because i expect at least catched 
*RejectedExecutionException* and finally i *didn't lost* message in this case !

{code:java}
@Component
public class AmqRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
       // errors();

        from("timer:foo?period=1000")
                .log("1")
                .transform().body(() -> "DATA " + 
RandomStringUtils.randomAlphanumeric(10))
                .convertBodyTo(String.class)
                .to(amq());

        from(amq()).to("direct:foo");

        from("direct:foo")
                .delay(3000)
                .doTry() // Strange /!\
                .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                .completionSize(10)
                .forceCompletionOnStop()
                .log("${body}");
    }

    public void errors() {
        onException(Exception.class)
                .useOriginalMessage()
                .to("activemq:recovery")
                .handled(true)
                .onRedelivery(exchange -> System.err.println("push to amq"));

        errorHandler(deadLetterChannel("log:dead?level=ERROR"));
    }

    private static String amq() {
        String amq = "activemq:data";

        amq += "?transacted=true";

        return amq;
    }

}
{code}



> Strange comportement with aggregator
> ------------------------------------
>
>                 Key: CAMEL-12906
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12906
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-activemq
>    Affects Versions: 2.22.1
>            Reporter: michael elbaz
>            Priority: Minor
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onException* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         errors();
>         from("timer:foo?period=1000")
>                 .log("1")
>                 .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
>                 .convertBodyTo(String.class)
>                 .to(amq());
>         from(amq()).to("direct:foo");
>         from("direct:foo")
>                 .delay(3000)
>                 .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
>                 .completionSize(10)
>                 .forceCompletionOnStop()
>                 .log("${body}");
>     }
>     public void errors() {
>         onException(Exception.class)
>                 .useOriginalMessage()
>                 .to("activemq:recovery")
>                 .handled(true)
>                 .onRedelivery(exchange -> System.err.println("push to amq"));
>         errorHandler(deadLetterChannel("log:dead?level=ERROR"));
>     }
>     private static String amq() {
>         String amq = "activemq:data";
>         amq += "?transacted=true";
>         return amq;
>     }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>       at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>       at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>       at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>        // errors();
>         from("timer:foo?period=1000")
>                 .log("1")
>                 .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
>                 .convertBodyTo(String.class)
>                 .to(amq());
>         from(amq()).to("direct:foo");
>         from("direct:foo")
>                 .delay(3000)
>                 .doTry() // Strange /!\
>                 .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
>                 .completionSize(10)
>                 .forceCompletionOnStop()
>                 .log("${body}");
>     }
>     public void errors() {
>         onException(Exception.class)
>                 .useOriginalMessage()
>                 .to("activemq:recovery")
>                 .handled(true)
>                 .onRedelivery(exchange -> System.err.println("push to amq"));
>         errorHandler(deadLetterChannel("log:dead?level=ERROR"));
>     }
>     private static String amq() {
>         String amq = "activemq:data";
>         amq += "?transacted=true";
>         return amq;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to