[
https://issues.apache.org/jira/browse/CAMEL-19734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777306#comment-17777306
]
Rich T commented on CAMEL-19734:
--------------------------------
Apologies for the delay: here's an update to your test that highlights the
problem (notice the Start SEDA route that passes to the multiple consumers of
ClientInboundQueue), I have added a thread sleep to the AttributeEventQueue to
ensure it backs up but might not be necessary. In this configuration the
AttributeEventQueue gets filled and this pushes back on route ID three, route
one and two should never be impeded but you should see in the message history
route one and/or two can appear at the top which seems wrong, also as mentioned
the elapsed time makes no sense.
{code:java}
public class SedaTest extends CamelTestSupport {
private Logger logger = LoggerFactory.getLogger(SedaTest.class);
@Test
public void testMock() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(3);
template.sendBody("seda://Start", "Hello World");
template.sendBody("seda://Start", "Hello World");
template.sendBody("seda://Start", "Hello World");
template.sendBody("seda://Start", "Hello World");
template.sendBody("seda://Start", "Hello World");
MockEndpoint.assertIsSatisfied(context);
}
@Override
protected RoutesBuilder createRouteBuilder() {
context.setMessageHistory(true);
return new RouteBuilder() {
@Override
public void configure() {
from("seda://Start")
.id("start")
.process(exchange -> logger.info("Executing start"))
.to("seda://ClientInboundQueue");
from("seda://ClientInboundQueue?" +
"multipleConsumers=true&" +
"waitForTaskToComplete=IfReplyExpected&" +
"purgeWhenStopping=true&" +
"discardIfNoConsumers=true&" +
"limitConcurrentConsumers=false&" +
"size=1000")
.id("one")
.process(exchange -> logger.info("Executing
ClientInboundQueue-one"));
from("seda://ClientInboundQueue?" +
"multipleConsumers=true&" +
"waitForTaskToComplete=IfReplyExpected&" +
"purgeWhenStopping=true&" +
"discardIfNoConsumers=true&" +
"limitConcurrentConsumers=false&" +
"size=1000")
.id("two")
.process(exchange -> logger.info("Executing
ClientInboundQueue-two"));
from("seda://ClientInboundQueue?" +
"multipleConsumers=true&" +
"waitForTaskToComplete=IfReplyExpected&" +
"purgeWhenStopping=true&" +
"discardIfNoConsumers=true&" +
"limitConcurrentConsumers=false&" +
"size=1000")
.id("three")
.process(exchange -> logger.info("Executing
ClientInboundQueue-three"))
.to("seda://AttributeEventQueue");
from("seda://AttributeEventQueue?" +
"waitForTaskToComplete=IfReplyExpected&" +
"timeout=10000&" +
"purgeWhenStopping=true&" +
"discardIfNoConsumers=false&size=2")
.process(exchange -> {
logger.info("Executing AttributeEventQueue");
Thread.sleep(1000);
})
.to("mock:result");
}
};
} {code}
Here's an example incorrect history output with route one shown at the top:
{code:java}
[14 - seda://ClientInboundQueue] DefaultErrorHandler ERROR Failed
delivery for (MessageId: C231FF4DD699985-0000000000000012 on ExchangeId:
C231FF4DD699985-0000000000000012). Exhausted after delivery attempt: 1 caught:
java.lang.IllegalStateException: Queue fullMessage History (source location is
disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID
Processor Elapsed (ms)
one/one
from[seda://ClientInboundQueue?discardIfNoConsumer 540646067
start/process1
Processor@0x7db534f2 0
start/to1
seda://ClientInboundQueue 0
three/process4
Processor@0x5910de75 0
three/to2
seda://AttributeEventQueue 0 {code}
If we send the messages directly to ClientInboundQueue (rather than Start) then
the message history is also incorrect:
{code:java}
[11 - seda://ClientInboundQueue] DefaultErrorHandler ERROR Failed
delivery for (MessageId: 74874E358312720-000000000000000E on ExchangeId:
74874E358312720-000000000000000E). Exhausted after delivery attempt: 1 caught:
java.lang.IllegalStateException: Queue fullMessage History (source location is
disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID
Processor Elapsed (ms)
two/two
from[seda://ClientInboundQueue?discardIfNoConsumer 541298686
three/process4
Processor@0x5910de75 1
three/to2
seda://AttributeEventQueue 0
{code}
Hope this helps.
> SEDA endpoint with multiple consumers produces strange message history from
> error handler
> -----------------------------------------------------------------------------------------
>
> Key: CAMEL-19734
> URL: https://issues.apache.org/jira/browse/CAMEL-19734
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 3.20.6
> Reporter: Rich T
> Assignee: Rhuan Rocha
> Priority: Minor
> Attachments: camel-seda-reproducer.zip
>
>
> I have a SEDA endpoint with multiple consumers enabled and three consumers
> configured and default error handler is in place.
> If one of the consumer queues becomes full then with the default error
> handler the message history is output but the first item is always one of the
> other consumers with a really large elapsed time (the system hasn't even been
> running this long) and then this is followed by the consumer route that
> actually threw the exception:
>
>
> {code:java}
> Message History (source location is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source ID
> Processor Elapsed (ms)
> ClientInbound-ReadSimulatorSta
> from[seda://ClientInboundQueue?concurrentConsumers 1201857883
> ClientInbound-EventProcessor/p
> Processor@0x713c4d95 0
> ClientInbound-EventProcessor/c
> choice[when[{body instanceof xxx 0
> ClientInbound-EventProcessor/t
> seda://AttributeEventQueue?waitForTaskToComplete=I 0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.lang.IllegalStateException: Queue full
> {code}
> So from the above `ClientInbound-ReadSimulatorState` is one of the multiple
> consumers that doesn't throw an exception.
> `ClientInbound-EventProcessor` is the route that has a full queue which
> results in the queue full exception.
> I would expect to only see the `ClientInbound-EventProcessor` in the message
> history here; the first route in the history log seems to alternate between
> the two consumers that didn't generate the exception on successive error logs
> but they always have this strange elapsed time, here's a log showing the
> third consumer listed first:
>
>
> {code:java}
> Message History (source location is disabled)
> ---------------------------------------------------------------------------------------------------------------------------------------
> Source ID
> Processor Elapsed (ms)
> ClientInbound-Query/ClientInbo
> from[seda://ClientInboundQueue?concurrentConsumers 1201857882
> ClientInbound-EventProcessor/p
> Processor@0x713c4d95 0
> ClientInbound-EventProcessor/c
> choice[when[{body instanceof xxx 0
> ClientInbound-EventProcessor/t
> seda://AttributeEventQueue?waitForTaskToComplete=I 0
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.lang.IllegalStateException: Queue full
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)