This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch his in repository https://gitbox.apache.org/repos/asf/camel.git
commit 29532027e6e2e6dc381c176250331e8875b16411 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Jan 15 16:54:15 2026 +0100 CAMEL-22858: camel-jbang - get history command may only show last step --- .../camel/spi/BacklogTracerEventMessage.java | 13 +- .../org/apache/camel/spi/InternalProcessor.java | 4 +- .../apache/camel/spi/InternalProcessorFactory.java | 3 + .../apache/camel/impl/debugger/BacklogTracer.java | 4 +- .../impl/debugger/DefaultBacklogDebugger.java | 12 +- .../debugger/DefaultBacklogTracerEventMessage.java | 14 +- .../camel/impl/engine/CamelInternalProcessor.java | 324 +++++++++++++++++---- .../apache/camel/impl/engine/DefaultChannel.java | 4 +- .../impl/engine/DefaultInflightRepository.java | 4 +- .../processor/DefaultInternalProcessorFactory.java | 7 + .../org/apache/camel/reifier/AggregateReifier.java | 9 +- .../org/apache/camel/reifier/RouteReifier.java | 2 +- .../management/BacklogTracerAggregateTest.java | 1 - .../BacklogTracerMessageHistoryTest.java | 57 +++- .../apache/camel/management/BacklogTracerTest.java | 29 +- .../core/commands/action/CamelHistoryAction.java | 20 +- 16 files changed, 403 insertions(+), 104 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java index 20aacafdd3d5..8ef5c1647970 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java @@ -34,12 +34,14 @@ public interface BacklogTracerEventMessage { long getUid(); /** - * Whether this is a new incoming message and this is the first trace. + * Whether this is first message for a given route When a message is routed via multiple routes, then each route + * will have a first/last pair. */ boolean isFirst(); /** - * Whether this is the last trace of the message (its complete). + * Whether this is last message for a given route When a message is routed via multiple routes, then each route will + * have a first/last pair. */ boolean isLast(); @@ -54,10 +56,15 @@ public interface BacklogTracerEventMessage { String getLocation(); /** - * Route id + * Current route id */ String getRouteId(); + /** + * The original incoming route id + */ + String getFromRouteId(); + /** * Whether this event was from a route that is created from Rest DSL. */ diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java index 8b4136a0efb1..3e338080e7da 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessor.java @@ -19,6 +19,8 @@ package org.apache.camel.spi; import java.util.List; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.NamedRoute; import org.apache.camel.Processor; import org.apache.camel.Route; @@ -74,7 +76,7 @@ public interface InternalProcessor extends AsyncProcessor { /** * Add advice for setting up {@link UnitOfWork} with the lifecycle of the route. */ - void addRouteLifecycleAdvice(); + void addRouteLifecycleAdvice(CamelContext camelContext, Route route, NamedRoute node); /** * Add advice for JMX management for the route diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index 012f998c380e..7344c3a6962a 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Channel; import org.apache.camel.Endpoint; +import org.apache.camel.NamedNode; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -43,6 +44,8 @@ public interface InternalProcessorFactory { InternalProcessor addUnitOfWorkProcessorAdvice(CamelContext camelContext, Processor processor, Route route); + CamelInternalProcessorAdvice<?> createAggregateBacklogTracerAdvice(CamelContext camelContext, NamedNode definition); + SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext); Channel createChannel(CamelContext camelContext); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java index 28c08f8f6d21..3e70af452a7b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java @@ -155,7 +155,9 @@ public class BacklogTracer extends ServiceSupport implements org.apache.camel.sp } if (tid == null || tid.equals(event.getExchangeId()) || tid.equals(event.getCorrelationExchangeId())) { provisionalHistoryQueue.add(event); - if (event.isLast()) { + boolean original = head != null && event.getRouteId() != null && event.getRouteId().equals(head.getRouteId()); + if (event.isLast() && original) { + // only trigger completion when it's the original last completeHistoryQueue.clear(); completeHistoryQueue.addAll(provisionalHistoryQueue); provisionalHistoryQueue.clear(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java index 92daba25a46d..7adf38f6b7c8 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java @@ -890,7 +890,8 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back nodeId, (nId, message) -> new DefaultBacklogTracerEventMessage( camelContext, - false, false, message.getUid(), message.getTimestamp(), message.getLocation(), message.getRouteId(), + false, false, message.getUid(), message.getTimestamp(), message.getLocation(), message.getFromRouteId(), + message.getRouteId(), message.getToNode(), message.getToNodeParentId(), message.getToNodeParentWhenId(), message.getToNodeParentWhenLabel(), message.getToNodeShortName(), message.getToNodeLabel(), @@ -934,6 +935,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back String toNodeShortName = definition.getShortName(); // avoid label is too large String toNodeLabel = StringHelper.limitLength(definition.getLabel(), 50); + String fromRouteId = exchange.getFromRouteId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); @@ -945,7 +947,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - first, false, uid, timestamp, source, routeId, toNode, toNodeParentId, null, null, + first, false, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, null, null, toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, false, false, data); suspendedBreakpointMessages.put(nodeId, msg); @@ -1024,6 +1026,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back String toNodeShortName = definition.getShortName(); // avoid label is too large String toNodeLabel = StringHelper.limitLength(definition.getLabel(), 50); + String fromRouteId = exchange.getFromRouteId(); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); @@ -1034,7 +1037,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - false, false, uid, timestamp, source, routeId, toNode, toNodeParentId, null, null, + false, false, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, null, null, toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, false, false, data); @@ -1133,6 +1136,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back long timestamp = System.currentTimeMillis(); String toNode = CamelContextHelper.getRouteId(definition); String toNodeParentId = definition.getParentId(); + String fromRouteId = exchange.getFromRouteId(); String routeId = route != null ? route.getRouteId() : toNode; String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); @@ -1143,7 +1147,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - false, true, uid, timestamp, source, routeId, toNode, toNodeParentId, + false, true, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, null, null, null, null, level, exchangeId, correlationExchangeId, false, false, data); // we want to capture if there was an exception diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java index a299be7b7918..7e8b0edabc38 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java @@ -44,6 +44,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private final long timestamp; private final String location; private final String routeId; + private final String fromRouteId; private final String toNode; private final String toNodeParentId; private final String toNodeParentWhenId; @@ -72,7 +73,8 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private boolean done; public DefaultBacklogTracerEventMessage(CamelContext camelContext, boolean first, boolean last, long uid, long timestamp, - String location, String routeId, String toNode, String toNodeParentId, + String location, String fromRouteId, String routeId, String toNode, + String toNodeParentId, String toNodeParentWhenId, String toNodeParentWhenLabel, String toNodeShortName, String toNodeLabel, int toNodeLevel, String exchangeId, String correlationExchangeId, @@ -84,6 +86,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven this.uid = uid; this.timestamp = timestamp; this.location = location; + this.fromRouteId = fromRouteId; this.routeId = routeId; this.toNode = toNode; this.toNodeParentId = toNodeParentId; @@ -138,6 +141,11 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven return routeId; } + @Override + public String getFromRouteId() { + return fromRouteId; + } + @Override public boolean isRest() { return rest; @@ -351,6 +359,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven } // route id is optional and we then use an empty value for no route id sb.append(prefix).append(" <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n"); + sb.append(prefix).append(" <fromRouteId>").append(fromRouteId != null ? fromRouteId : "").append("</fromRouteId>\n"); if (endpointUri != null) { sb.append(prefix).append(" <endpointUri>").append(endpointUri).append("</endpointUri>\n"); sb.append(prefix).append(" <remoteEndpoint>").append(remoteEndpoint).append("</remoteEndpoint>\n"); @@ -550,6 +559,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven if (routeId != null) { jo.put("routeId", routeId); } + if (fromRouteId != null) { + jo.put("fromRouteId", fromRouteId); + } if (toNode != null) { jo.put("nodeId", toNode); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index d86ef0f57c62..368a9c09bd7e 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -82,6 +82,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; +import static org.apache.camel.impl.engine.DefaultChannel.getOrCreateBacklogTracer; /** * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: @@ -197,8 +198,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } @Override - public void addRouteLifecycleAdvice() { + public void addRouteLifecycleAdvice(CamelContext camelContext, Route route, NamedRoute node) { addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice()); + if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) { + addAdvice(new BacklogTracerRouteAdvice(camelContext, node)); + } } @Override @@ -216,6 +220,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In if (task2 != null) { task2.setRoute(route); } + BacklogTracerRouteAdvice task3 = getAdvice(BacklogTracerRouteAdvice.class); + if (task3 != null) { + task3.setRoute(route); + } } /** @@ -594,6 +602,254 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } + /** + * Advice to invoke callbacks for before and after routing for {@link org.apache.camel.spi.BacklogTracer}. + */ + public static class BacklogTracerRouteAdvice implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage> { + + private final CamelContext camelContext; + private final NamedRoute routeDefinition; + private final BacklogTracer backlogTracer; + private final boolean rest; + private final boolean template; + private final boolean skip; + private Route route; + + public BacklogTracerRouteAdvice(CamelContext camelContext, NamedRoute definition) { + this.camelContext = camelContext; + this.routeDefinition = definition; + if (routeDefinition != null) { + this.rest = routeDefinition.isCreatedFromRest(); + this.template = routeDefinition.isCreatedFromTemplate(); + } else { + this.rest = false; + this.template = false; + } + this.backlogTracer = getOrCreateBacklogTracer(camelContext); + // optimize whether to skip this route or not + if (this.rest && !backlogTracer.isTraceRests()) { + this.skip = true; + } else if (this.template && !backlogTracer.isTraceTemplates()) { + this.skip = true; + } else { + this.skip = false; + } + } + + public void setRoute(Route route) { + this.route = route; + } + + @Override + public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception { + if (!skip && backlogTracer.shouldTrace(routeDefinition.getInput(), exchange)) { + final long created = exchange.getClock().getCreated(); + NamedNode input = routeDefinition.getInput(); + String source = LoggerHelper.getLineNumberLoggerName(input); + String exchangeId = exchange.getExchangeId(); + String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String routeId = routeDefinition.getRouteId(); + String fromRouteId = exchange.getFromRouteId(); + int level = 1; + + boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); + boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); + JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties, + includeExchangeVariables, true, + true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), + backlogTracer.getBodyMaxChars()); + + DefaultBacklogTracerEventMessage first = new DefaultBacklogTracerEventMessage( + camelContext, + true, false, backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, + input.getId(), + null, null, null, + input.getShortName(), input.getLabel(), + level, exchangeId, correlationExchangeId, rest, template, data); + if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { + first.setEndpointServiceUrl(esl.getServiceUrl()); + first.setEndpointServiceProtocol(esl.getServiceProtocol()); + first.setEndpointServiceMetadata(esl.getServiceMetadata()); + } + backlogTracer.traceEvent(first); + return first; + } + return null; + } + + @Override + public void after(Exchange exchange, DefaultBacklogTracerEventMessage first) throws Exception { + if (first != null) { + + final long created = exchange.getClock().getCreated(); + NamedNode input = routeDefinition.getInput(); + String source = LoggerHelper.getLineNumberLoggerName(input); + String exchangeId = exchange.getExchangeId(); + String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String routeId = routeDefinition.getRouteId(); + String fromRouteId = exchange.getFromRouteId(); + int level = 1; + + boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); + boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); + JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties, + includeExchangeVariables, true, + true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), + backlogTracer.getBodyMaxChars()); + + DefaultBacklogTracerEventMessage last = new DefaultBacklogTracerEventMessage( + camelContext, + false, true, backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, + input.getId(), + null, null, null, + input.getShortName(), input.getLabel(), + level, exchangeId, correlationExchangeId, rest, template, data); + if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { + first.setEndpointServiceUrl(esl.getServiceUrl()); + first.setEndpointServiceProtocol(esl.getServiceProtocol()); + first.setEndpointServiceMetadata(esl.getServiceMetadata()); + } + backlogTracer.traceEvent(last); + doneProcessing(exchange, last); + doneProcessing(exchange, first); + // to not be confused then lets store duration on first/last as (first = 0, last = total time to process) + last.setElapsed(first.getElapsed()); + first.setElapsed(0); + } + } + + private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) { + data.doneProcessing(); + + String uri = null; + boolean remote = true; + Endpoint endpoint = null; + if ((data.isFirst() || data.isLast())) { + if (route.getConsumer() != null) { + // get the actual resolved uri + uri = route.getConsumer().getEndpoint().getEndpointUri(); + remote = route.getConsumer().getEndpoint().isRemote(); + endpoint = route.getEndpoint(); + } else { + uri = routeDefinition.getEndpointUrl(); + } + } + if (uri != null) { + data.setEndpointUri(uri); + } + data.setRemoteEndpoint(remote); + if (endpoint instanceof EndpointServiceLocation esl) { + data.setEndpointServiceUrl(esl.getServiceUrl()); + data.setEndpointServiceProtocol(esl.getServiceProtocol()); + data.setEndpointServiceMetadata(esl.getServiceMetadata()); + } + + if (!data.isFirst() && backlogTracer.isIncludeException()) { + // we want to capture if there was an exception + Throwable e = exchange.getException(); + if (e != null) { + data.setException(e); + } + } + } + } + + /** + * Special advice for handling aggregate EIP for the {@link org.apache.camel.spi.BacklogTracer}. + */ + public static final class BacklogTracerAggregateAdvice + implements CamelInternalProcessorAdvice<DefaultBacklogTracerEventMessage> { + + private final CamelContext camelContext; + private final NamedNode processorDefinition; + private final BacklogTracer backlogTracer; + + public BacklogTracerAggregateAdvice(CamelContext camelContext, NamedNode definition) { + this.camelContext = camelContext; + this.processorDefinition = definition; + this.backlogTracer = getOrCreateBacklogTracer(camelContext); + } + + @Override + public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception { + String exchangeId = exchange.getExchangeId(); + String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + int level = processorDefinition.getLevel(); + String routeId = ExchangeHelper.getAtRouteId(exchange); + String fromRouteId = exchange.getFromRouteId(); + String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); + + boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); + boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); + JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties, + includeExchangeVariables, true, + true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), + backlogTracer.getBodyMaxChars()); + + DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( + camelContext, + true, false, backlogTracer.incrementTraceCounter(), exchange.getClock().getCreated(), source, + fromRouteId, routeId, + processorDefinition.getId(), + null, null, null, + processorDefinition.getShortName(), processorDefinition.getLabel(), + level + 1, exchangeId, correlationExchangeId, false, false, data); + backlogTracer.traceEvent(event); + return event; + } + + @Override + public void after(Exchange exchange, DefaultBacklogTracerEventMessage first) throws Exception { + if (first != null) { + final long created = exchange.getClock().getCreated(); + String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); + String exchangeId = exchange.getExchangeId(); + String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String routeId = ExchangeHelper.getAtRouteId(exchange); + String fromRouteId = exchange.getFromRouteId(); + int level = 1; + + boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); + boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); + JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties, + includeExchangeVariables, true, + true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), + backlogTracer.getBodyMaxChars()); + + DefaultBacklogTracerEventMessage last = new DefaultBacklogTracerEventMessage( + camelContext, + false, true, backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, + processorDefinition.getId(), + null, null, null, + processorDefinition.getShortName(), processorDefinition.getLabel(), + level, exchangeId, correlationExchangeId, false, false, data); + if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { + first.setEndpointServiceUrl(esl.getServiceUrl()); + first.setEndpointServiceProtocol(esl.getServiceProtocol()); + first.setEndpointServiceMetadata(esl.getServiceMetadata()); + } + backlogTracer.traceEvent(last); + doneProcessing(exchange, last); + doneProcessing(exchange, first); + // to not be confused then lets store duration on first/last as (first = 0, last = total time to process) + last.setElapsed(first.getElapsed()); + first.setElapsed(0); + } + } + + private void doneProcessing(Exchange exchange, DefaultBacklogTracerEventMessage data) { + data.doneProcessing(); + + if (!data.isFirst() && backlogTracer.isIncludeException()) { + // we want to capture if there was an exception + Throwable e = exchange.getException(); + if (e != null) { + data.setException(e); + } + } + } + } + /** * Advice to execute the {@link BacklogTracer} if enabled. */ @@ -605,18 +861,16 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In private final BacklogTracer backlogTracer; private final NamedNode processorDefinition; private final NamedRoute routeDefinition; - private final boolean first; private final boolean rest; private final boolean template; private final boolean skip; public BacklogTracerAdvice(CamelContext camelContext, BacklogTracer backlogTracer, NamedNode processorDefinition, - NamedRoute routeDefinition, boolean first) { + NamedRoute routeDefinition) { this.camelContext = camelContext; this.backlogTracer = backlogTracer; this.processorDefinition = processorDefinition; this.routeDefinition = routeDefinition; - this.first = first; if (routeDefinition != null) { this.rest = routeDefinition.isCreatedFromRest(); @@ -687,83 +941,48 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) String routeId = routeDefinition != null ? routeDefinition.getRouteId() : null; - if (first) { - // use route as pseudo source when first - final long created = exchange.getClock().getCreated(); - - // special for aggregate which output are regarded as a new first - boolean aggregate = false; - NamedNode input = routeDefinition != null ? routeDefinition.getInput() : null; - if (processorDefinition.getParent() != null - && "aggregate".equals(processorDefinition.getParent().getShortName())) { - aggregate = true; - input = processorDefinition.getParent(); - } - String source = LoggerHelper.getLineNumberLoggerName(input); - - DefaultBacklogTracerEventMessage pseudoFirst; - if (aggregate) { - pseudoFirst = new DefaultBacklogTracerEventMessage( - camelContext, - true, false, backlogTracer.incrementTraceCounter(), created, source, routeId, input.getId(), - null, null, null, - input.getShortName(), input.getLabel(), - level - 1, exchangeId, correlationExchangeId, rest, template, data); - } else { - pseudoFirst = new DefaultBacklogTracerEventMessage( - camelContext, - true, false, backlogTracer.incrementTraceCounter(), created, source, routeId, input.getId(), - null, null, null, - input.getShortName(), input.getLabel(), - level, exchangeId, correlationExchangeId, rest, template, data); - if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { - pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl()); - pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol()); - pseudoFirst.setEndpointServiceMetadata(esl.getServiceMetadata()); - } - } - backlogTracer.traceEvent(pseudoFirst); - exchange.getExchangeExtension().addOnCompletion(createOnCompletion(source, aggregate, pseudoFirst)); - } + String fromRouteId = exchange.getFromRouteId(); String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); + DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( camelContext, - false, false, backlogTracer.incrementTraceCounter(), timestamp, source, routeId, toNode, toNodeParentId, + false, false, backlogTracer.incrementTraceCounter(), timestamp, source, fromRouteId, routeId, toNode, + toNodeParentId, toNodeParentWhenId, toNodeParentWhenLabel, toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, rest, template, data); backlogTracer.traceEvent(event); - return event; } return null; } - private SynchronizationAdapter createOnCompletion( - String source, boolean aggregate, DefaultBacklogTracerEventMessage pseudoFirst) { + private SynchronizationAdapter createAggregateOnCompletion( + String source, DefaultBacklogTracerEventMessage pseudoFirst) { return new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { - // create pseudo last + // create pseudo last for the aggregate String routeId = routeDefinition != null ? routeDefinition.getRouteId() : null; + String fromRouteId = exchange.getFromRouteId(); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); long created = exchange.getClock().getCreated(); int level = pseudoFirst.getToNodeLevel(); - // aggregate is special - String toNode = aggregate ? pseudoFirst.getToNode() : null; - String toNodeShortName = aggregate ? pseudoFirst.getToNodeShortName() : null; - String toNodeLabel = aggregate ? pseudoFirst.getToNodeLabel() : null; + String toNode = pseudoFirst.getToNode(); + String toNodeShortName = pseudoFirst.getToNodeShortName(); + String toNodeLabel = pseudoFirst.getToNodeLabel(); JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), includeExchangeProperties, includeExchangeVariables, true, true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); DefaultBacklogTracerEventMessage pseudoLast = new DefaultBacklogTracerEventMessage( camelContext, - false, true, backlogTracer.incrementTraceCounter(), created, source, routeId, toNode, null, null, + false, true, backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, toNode, + null, null, null, toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, rest, template, data); backlogTracer.traceEvent(pseudoLast); @@ -822,7 +1041,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } } } - } /** diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java index 2e2e7c77aceb..46c06f9ef098 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java @@ -210,7 +210,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) { // add jmx backlog tracer BacklogTracer backlogTracer = getOrCreateBacklogTracer(camelContext); - addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition, first)); + addAdvice(new BacklogTracerAdvice(camelContext, backlogTracer, targetOutputDef, routeDefinition)); } if (route.isTracing() || camelContext.isTracingStandby()) { // add logger tracer @@ -293,7 +293,7 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { } } - private static BacklogTracer getOrCreateBacklogTracer(CamelContext camelContext) { + static BacklogTracer getOrCreateBacklogTracer(CamelContext camelContext) { BacklogTracer tracer = null; if (camelContext.getRegistry() != null) { // lookup in registry diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java index c606fb47730a..b13186abbd15 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java @@ -231,7 +231,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override @SuppressWarnings("unchecked") public long getElapsed() { - // this can only be calculate if message history is enabled + // this can only be calculated if message history is enabled List<MessageHistory> list = exchange.getProperty(ExchangePropertyKey.MESSAGE_HISTORY, List.class); if (list == null || list.isEmpty()) { return 0; @@ -242,7 +242,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh if (history != null) { long elapsed = history.getElapsed(); if (elapsed == 0) { - // still in progress, so lets compute it via the start time + // still in progress, so let's compute it via the start time elapsed = history.getElapsedSinceCreated(); } return elapsed; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java index caa7a5058206..f7ee326de31e 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultInternalProcessorFactory.java @@ -22,6 +22,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; import org.apache.camel.Endpoint; +import org.apache.camel.NamedNode; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -29,6 +30,7 @@ import org.apache.camel.Route; import org.apache.camel.impl.engine.CamelInternalProcessor; import org.apache.camel.impl.engine.DefaultChannel; import org.apache.camel.impl.engine.SharedCamelInternalProcessor; +import org.apache.camel.spi.CamelInternalProcessorAdvice; import org.apache.camel.spi.InterceptSendToEndpoint; import org.apache.camel.spi.InternalProcessor; import org.apache.camel.spi.InternalProcessorFactory; @@ -45,6 +47,11 @@ public class DefaultInternalProcessorFactory implements InternalProcessorFactory return internal; } + @Override + public CamelInternalProcessorAdvice<?> createAggregateBacklogTracerAdvice(CamelContext camelContext, NamedNode definition) { + return new CamelInternalProcessor.BacklogTracerAggregateAdvice(camelContext, definition); + } + @Override public SharedInternalProcessor createSharedCamelInternalProcessor(CamelContext camelContext) { return new SharedCamelInternalProcessor( diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java index d84389b45343..55f2a12c0534 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.AggregationStrategy; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Expression; import org.apache.camel.Predicate; import org.apache.camel.Processor; @@ -33,6 +32,7 @@ import org.apache.camel.processor.aggregate.AggregateProcessor; import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.InternalProcessor; import org.apache.camel.support.PluginHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +54,13 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { Processor childProcessor = this.createChildProcessor(true); // wrap the aggregate route in a unit of work processor - AsyncProcessor target = PluginHelper.getInternalProcessorFactory(camelContext) + InternalProcessor target = PluginHelper.getInternalProcessorFactory(camelContext) .addUnitOfWorkProcessorAdvice(camelContext, childProcessor, route); + // if backlog tracing then add special advice to handle this + if (camelContext.isBacklogTracingStandby() || route.isBacklogTracing()) { + target.addAdvice(PluginHelper.getInternalProcessorFactory(camelContext) + .createAggregateBacklogTracerAdvice(camelContext, definition)); + } // correlation expression is required if (definition.getExpression() == null) { diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java index 66560579a4f2..82b7298077d5 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/RouteReifier.java @@ -324,7 +324,7 @@ public class RouteReifier extends ProcessorReifier<RouteDefinition> { } // wrap in route lifecycle - internal.addRouteLifecycleAdvice(); + internal.addRouteLifecycleAdvice(camelContext, route, definition); // add advices if (definition.getRestBindingDefinition() != null) { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java index 4ce46df10b67..2d08cfc9e058 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerAggregateTest.java @@ -60,7 +60,6 @@ public class BacklogTracerAggregateTest extends ManagementTestSupport { = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpAllTracedMessages", null, null); assertNotNull(events); - assertEquals(19, events.size()); // should be 4 first and 4 last assertEquals(4, events.stream().filter(BacklogTracerEventMessage::isFirst).count()); diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java index 167b3fbbf324..99e6b7a89fe7 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java @@ -31,7 +31,6 @@ import org.junit.jupiter.api.condition.OS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledOnOs(OS.AIX) @@ -95,27 +94,54 @@ public class BacklogTracerMessageHistoryTest extends ManagementTestSupport { events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpLatestMessageHistory", null, null); assertNotNull(events); - assertEquals(4, events.size()); + assertEquals(8, events.size()); assertTrue(events.get(0).isFirst()); assertEquals("direct://start", events.get(0).getEndpointUri()); assertEquals("from", events.get(0).getToNodeShortName()); - + assertEquals("myRoute", events.get(0).getRouteId()); + assertEquals("myRoute", events.get(0).getFromRouteId()); assertFalse(events.get(1).isFirst()); assertFalse(events.get(1).isLast()); assertEquals("foo", events.get(1).getToNode()); assertEquals("to", events.get(1).getToNodeShortName()); assertEquals("to[mock:foo]", events.get(1).getToNodeLabel()); - - assertFalse(events.get(2).isFirst()); - assertFalse(events.get(2).isLast()); - assertEquals("bar", events.get(2).getToNode()); - assertEquals("to", events.get(2).getToNodeShortName()); - assertEquals("to[mock:bar]", events.get(2).getToNodeLabel()); - - assertTrue(events.get(3).isLast()); - assertEquals("direct://start", events.get(3).getEndpointUri()); - assertNull(events.get(3).getToNode()); + assertEquals("myRoute", events.get(1).getRouteId()); + assertEquals("myRoute", events.get(1).getFromRouteId()); + + // sub-route + assertTrue(events.get(3).isFirst()); + assertFalse(events.get(3).isLast()); + assertEquals("direct://sub", events.get(3).getEndpointUri()); + assertEquals("from", events.get(3).getToNodeShortName()); + assertEquals("mySub", events.get(3).getRouteId()); + assertEquals("myRoute", events.get(3).getFromRouteId()); + assertFalse(events.get(4).isFirst()); + assertFalse(events.get(4).isLast()); + assertEquals("sub", events.get(4).getToNode()); + assertEquals("to", events.get(4).getToNodeShortName()); + assertEquals("to[mock:sub]", events.get(4).getToNodeLabel()); + assertEquals("mySub", events.get(4).getRouteId()); + assertEquals("myRoute", events.get(4).getFromRouteId()); + assertFalse(events.get(5).isFirst()); + assertTrue(events.get(5).isLast()); + assertEquals("direct://sub", events.get(5).getEndpointUri()); + assertEquals("from", events.get(5).getToNodeShortName()); + assertEquals("mySub", events.get(5).getRouteId()); + assertEquals("myRoute", events.get(5).getFromRouteId()); + + assertFalse(events.get(6).isFirst()); + assertFalse(events.get(6).isLast()); + assertEquals("bar", events.get(6).getToNode()); + assertEquals("to", events.get(6).getToNodeShortName()); + assertEquals("to[mock:bar]", events.get(6).getToNodeLabel()); + assertEquals("myRoute", events.get(6).getRouteId()); + assertEquals("myRoute", events.get(6).getFromRouteId()); + assertTrue(events.get(7).isLast()); + assertEquals("direct://start", events.get(7).getEndpointUri()); + assertEquals("from1", events.get(7).getToNode()); + assertEquals("myRoute", events.get(7).getRouteId()); + assertEquals("myRoute", events.get(7).getFromRouteId()); } @Override @@ -127,10 +153,13 @@ public class BacklogTracerMessageHistoryTest extends ManagementTestSupport { context.setBacklogTracing(true); context.setMessageHistory(true); - from("direct:start") + from("direct:start").routeId("myRoute") .to("mock:foo").id("foo") + .to("direct:sub") .to("mock:bar").id("bar"); + from("direct:sub").routeId("mySub") + .to("mock:sub").id("sub"); } }; } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java index 4f3f07857520..3b4d2a6edace 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerTest.java @@ -395,18 +395,19 @@ public class BacklogTracerTest extends ManagementTestSupport { Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize"); assertEquals(100, size.intValue(), "Should be 100"); - // change size to 2 x 10 (as we need for first as well) - mbeanServer.setAttribute(on, new Attribute("BacklogSize", 20)); + // change size to 10 + mbeanServer.setAttribute(on, new Attribute("BacklogSize", 10)); // set the pattern to match only foo mbeanServer.setAttribute(on, new Attribute("TracePattern", "foo")); Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); assertEquals(Boolean.TRUE, enabled, "Should not be enabled"); - getMockEndpoint("mock:foo").expectedMessageCount(10); - getMockEndpoint("mock:bar").expectedMessageCount(10); + getMockEndpoint("mock:foo").expectedMessageCount(13); + getMockEndpoint("mock:bar").expectedMessageCount(13); - for (int i = 0; i < 10; i++) { + // send 10 + 3 extra + for (int i = 0; i < 13; i++) { template.sendBody("direct:start", "###" + i + "###"); } @@ -414,20 +415,22 @@ public class BacklogTracerTest extends ManagementTestSupport { List<BacklogTracerEventMessage> events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages", new Object[] { "foo" }, new String[] { "java.lang.String" }); - assertEquals(7, events.size()); + assertEquals(10, events.size()); // the first should be 3 and the last 9 String xml = events.get(0).getMessageAsXml(); assertTrue(xml.contains("###3###")); xml = events.get(6).getMessageAsXml(); assertTrue(xml.contains("###9###")); + xml = events.get(9).getMessageAsXml(); + assertTrue(xml.contains("###12###")); // send in another message template.sendBody("direct:start", "###" + 10 + "###"); events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages", new Object[] { "foo" }, new String[] { "java.lang.String" }); - assertEquals(7, events.size()); + assertEquals(10, events.size()); // and we are shifted one now xml = events.get(0).getMessageAsXml(); @@ -436,20 +439,20 @@ public class BacklogTracerTest extends ManagementTestSupport { assertTrue(xml.contains("###10###")); // send in 4 messages - template.sendBody("direct:start", "###" + 11 + "###"); - template.sendBody("direct:start", "###" + 12 + "###"); - template.sendBody("direct:start", "###" + 13 + "###"); template.sendBody("direct:start", "###" + 14 + "###"); + template.sendBody("direct:start", "###" + 15 + "###"); + template.sendBody("direct:start", "###" + 16 + "###"); + template.sendBody("direct:start", "###" + 17 + "###"); events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages", new Object[] { "foo" }, new String[] { "java.lang.String" }); - assertEquals(7, events.size()); + assertEquals(10, events.size()); // and we are shifted +4 now xml = events.get(0).getMessageAsXml(); assertTrue(xml.contains("###8###")); - xml = events.get(6).getMessageAsXml(); - assertTrue(xml.contains("###14###")); + xml = events.get(9).getMessageAsXml(); + assertTrue(xml.contains("###17###")); } @Override diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java index ff2b6d0ec075..ddf488b2cc67 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java @@ -533,16 +533,18 @@ public class CamelHistoryAction extends ActionWatchCommand { private String getStatus(Row r) { boolean remote = r.endpoint != null && r.endpoint.getBooleanOrDefault("remote", false); + boolean original = r.fromRouteId != null && r.fromRouteId.equals(r.routeId); if (r.first) { - String s = "Created"; + String s = original ? "Created" : remote ? "Sent" : "Processed"; if (loggingColor) { return Ansi.ansi().fg(Ansi.Color.GREEN).a(s).reset().toString(); } else { return s; } } else if (r.last) { - String done = r.exception != null ? "Completed (exception)" : "Completed (success)"; + String s = original ? "Completed" : remote ? "Sent" : "Processed"; + String done = r.exception != null ? s + " (exception)" : s + " (success)"; if (loggingColor) { return Ansi.ansi().fg(r.failed ? Ansi.Color.RED : Ansi.Color.GREEN).a(done).reset().toString(); } else { @@ -610,7 +612,9 @@ public class CamelHistoryAction extends ActionWatchCommand { if (source && r.location != null) { answer = r.location; } else { - if (r.nodeId == null) { + if (r.routeId != null && r.nodeId != null) { + answer = r.routeId + "/" + r.nodeId; + } else if (r.nodeId == null) { answer = r.routeId; } else { answer = r.nodeId; @@ -630,10 +634,11 @@ public class CamelHistoryAction extends ActionWatchCommand { } private String getDirection(Row r) { + boolean original = r.routeId != null && r.routeId.equals(r.fromRouteId); if (r.first) { - return "*-->"; + return original ? "*-->" : " -->"; } else if (r.last) { - return "*<--"; + return original ? "*<--" : " <--"; } else { return null; } @@ -654,10 +659,11 @@ public class CamelHistoryAction extends ActionWatchCommand { } private String getMessage(Row r) { + boolean original = r.routeId != null && r.routeId.equals(r.fromRouteId); if (r.failed && !r.last) { return "Exception: " + r.exception.getString("message"); } - if (r.last) { + if (r.last && original) { return r.failed ? "Failed" : "Success"; } return r.summary; @@ -714,6 +720,7 @@ public class CamelHistoryAction extends ActionWatchCommand { row.first = jo.getBoolean("first"); row.last = jo.getBoolean("last"); row.location = jo.getString("location"); + row.fromRouteId = jo.getString("fromRouteId"); row.routeId = jo.getString("routeId"); row.nodeId = jo.getString("nodeId"); row.nodeParentId = jo.getString("nodeParentId"); @@ -919,6 +926,7 @@ public class CamelHistoryAction extends ActionWatchCommand { String exchangePattern; String threadName; String location; + String fromRouteId; String routeId; String nodeId; String nodeParentId;
