[ 
https://issues.apache.org/jira/browse/CAMEL-19734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777306#comment-17777306
 ] 

Rich T commented on CAMEL-19734:
--------------------------------

Apologies for the delay: here's an update to your test that highlights the 
problem (notice the Start SEDA route that passes to the multiple consumers of 
ClientInboundQueue), I have added a thread sleep to the AttributeEventQueue to 
ensure it backs up but might not be necessary. In this configuration the 
AttributeEventQueue gets filled and this pushes back on route ID three, route 
one and two should never be impeded but you should see in the message history 
route one and/or two can appear at the top which seems wrong, also as mentioned 
the elapsed time makes no sense. 


{code:java}
public class SedaTest extends CamelTestSupport {

    private Logger logger = LoggerFactory.getLogger(SedaTest.class);
    @Test
    public void testMock() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");

        ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(3);
        template.sendBody("seda://Start", "Hello World");
        template.sendBody("seda://Start", "Hello World");
        template.sendBody("seda://Start", "Hello World");
        template.sendBody("seda://Start", "Hello World");
        template.sendBody("seda://Start", "Hello World");
        MockEndpoint.assertIsSatisfied(context);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() {
        context.setMessageHistory(true);
        return new RouteBuilder() {
            @Override
            public void configure() {

                from("seda://Start")
                    .id("start")
                    .process(exchange -> logger.info("Executing start"))
                    .to("seda://ClientInboundQueue");

                from("seda://ClientInboundQueue?" +
                        "multipleConsumers=true&" +
                        "waitForTaskToComplete=IfReplyExpected&" +
                        "purgeWhenStopping=true&" +
                        "discardIfNoConsumers=true&" +
                        "limitConcurrentConsumers=false&" +
                        "size=1000")
                        .id("one")
                        .process(exchange -> logger.info("Executing 
ClientInboundQueue-one"));

                from("seda://ClientInboundQueue?" +
                        "multipleConsumers=true&" +
                        "waitForTaskToComplete=IfReplyExpected&" +
                        "purgeWhenStopping=true&" +
                        "discardIfNoConsumers=true&" +
                        "limitConcurrentConsumers=false&" +
                        "size=1000")
                        .id("two")
                        .process(exchange -> logger.info("Executing 
ClientInboundQueue-two"));

                from("seda://ClientInboundQueue?" +
                        "multipleConsumers=true&" +
                        "waitForTaskToComplete=IfReplyExpected&" +
                        "purgeWhenStopping=true&" +
                        "discardIfNoConsumers=true&" +
                        "limitConcurrentConsumers=false&" +
                        "size=1000")
                        .id("three")
                        .process(exchange -> logger.info("Executing 
ClientInboundQueue-three"))
                        .to("seda://AttributeEventQueue");

                from("seda://AttributeEventQueue?" +
                        "waitForTaskToComplete=IfReplyExpected&" +
                        "timeout=10000&" +
                        "purgeWhenStopping=true&" +
                        "discardIfNoConsumers=false&size=2")
                        .process(exchange -> {
                            logger.info("Executing AttributeEventQueue");
                            Thread.sleep(1000);
                        })
                        .to("mock:result");

            }
        };
    } {code}
Here's an example incorrect history output with route one shown at the top:


{code:java}
[14 - seda://ClientInboundQueue] DefaultErrorHandler            ERROR Failed 
delivery for (MessageId: C231FF4DD699985-0000000000000012 on ExchangeId: 
C231FF4DD699985-0000000000000012). Exhausted after delivery attempt: 1 caught: 
java.lang.IllegalStateException: Queue fullMessage History (source location is 
disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             
Processor                                          Elapsed (ms)
                                         one/one                        
from[seda://ClientInboundQueue?discardIfNoConsumer    540646067
                                         start/process1                 
Processor@0x7db534f2                                          0
                                         start/to1                      
seda://ClientInboundQueue                                     0
                                         three/process4                 
Processor@0x5910de75                                          0
                                         three/to2                      
seda://AttributeEventQueue                                    0 {code}
If we send the messages directly to ClientInboundQueue (rather than Start) then 
the message history is also incorrect:



{code:java}
[11 - seda://ClientInboundQueue] DefaultErrorHandler            ERROR Failed 
delivery for (MessageId: 74874E358312720-000000000000000E on ExchangeId: 
74874E358312720-000000000000000E). Exhausted after delivery attempt: 1 caught: 
java.lang.IllegalStateException: Queue fullMessage History (source location is 
disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             
Processor                                          Elapsed (ms)
                                         two/two                        
from[seda://ClientInboundQueue?discardIfNoConsumer    541298686
                                         three/process4                 
Processor@0x5910de75                                          1
                                         three/to2                      
seda://AttributeEventQueue                                    0
 {code}
Hope this helps.

> SEDA endpoint with multiple consumers produces strange message history from 
> error handler
> -----------------------------------------------------------------------------------------
>
>                 Key: CAMEL-19734
>                 URL: https://issues.apache.org/jira/browse/CAMEL-19734
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 3.20.6
>            Reporter: Rich T
>            Assignee: Rhuan Rocha
>            Priority: Minor
>         Attachments: camel-seda-reproducer.zip
>
>
> I have a SEDA endpoint with multiple consumers enabled and three consumers 
> configured and default error handler is in place.
> If one of the consumer queues becomes full then with the default error 
> handler the message history is output but the first item is always one of the 
> other consumers with a really large elapsed time (the system hasn't even been 
> running this long) and then this is followed by the consumer route that 
> actually threw the exception:
>  
>  
> {code:java}
> Message History (source location is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source                                   ID                             
> Processor                                          Elapsed (ms)
>                                          ClientInbound-ReadSimulatorSta 
> from[seda://ClientInboundQueue?concurrentConsumers   1201857883
>                                          ClientInbound-EventProcessor/p 
> Processor@0x713c4d95                                          0
>                                          ClientInbound-EventProcessor/c 
> choice[when[{body instanceof xxx                              0
>                                          ClientInbound-EventProcessor/t 
> seda://AttributeEventQueue?waitForTaskToComplete=I            0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.lang.IllegalStateException: Queue full
> {code}
> So from the above `ClientInbound-ReadSimulatorState` is one of the multiple 
> consumers that doesn't throw an exception.
> `ClientInbound-EventProcessor` is the route that has a full queue which 
> results in the queue full exception.
> I would expect to only see the `ClientInbound-EventProcessor` in the message 
> history here; the first route in the history log seems to alternate between 
> the two consumers that didn't generate the exception on successive error logs 
> but they always have this strange elapsed time, here's a log showing the 
> third consumer listed first:
>  
>  
> {code:java}
> Message History (source location is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source                                   ID                             
> Processor                                          Elapsed (ms)
>                                          ClientInbound-Query/ClientInbo 
> from[seda://ClientInboundQueue?concurrentConsumers   1201857882
>                                          ClientInbound-EventProcessor/p 
> Processor@0x713c4d95                                          0
>                                          ClientInbound-EventProcessor/c 
> choice[when[{body instanceof xxx                              0
>                                          ClientInbound-EventProcessor/t 
> seda://AttributeEventQueue?waitForTaskToComplete=I            0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.lang.IllegalStateException: Queue full
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to