This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23684 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 29c2cbe14acda1b085b74833729d0f1e7360c0f8 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Jun 4 12:49:44 2026 +0200 CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID Capture the breadcrumb ID (CamelBreadcrumbId header) in BacklogTracer event messages and use it to correlate message history across broker boundaries. When exchanges pass through Kafka, SEDA, JMS, etc., each consumer creates a new independent exchange - the breadcrumb ID links them together so the history tab shows the full end-to-end flow. Falls back to exchange ID / correlation ID matching when breadcrumb is not set (e.g. when allowUseOriginalMessage is disabled). Co-Authored-By: Claude Opus 4.6 <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../apache/camel/spi/BacklogTracerEventMessage.java | 8 ++++++++ .../org/apache/camel/impl/debugger/BacklogTracer.java | 17 ++++++++++++++--- .../camel/impl/debugger/DefaultBacklogDebugger.java | 10 +++++++--- .../debugger/DefaultBacklogTracerEventMessage.java | 12 +++++++++++- .../camel/impl/engine/CamelInternalProcessor.java | 18 ++++++++++++------ 5 files changed, 52 insertions(+), 13 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java index 7956e8587e53..a87c08ec0b6a 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java @@ -129,6 +129,14 @@ public interface BacklogTracerEventMessage extends BacklogEventMessage { @Nullable String getCorrelationExchangeId(); + /** + * The breadcrumb id that links exchanges across broker boundaries (Kafka, SEDA, JMS, etc.) + * + * @since 4.21 + */ + @Nullable + String getBreadcrumbId(); + /** * The name of the thread that is processing the message, when this event was captured. */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java index 2bc0a9543535..d1a8b9559803 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java @@ -164,6 +164,7 @@ public class BacklogTracer extends ServiceSupport implements org.apache.camel.sp String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = node.getLevel(); String fromRouteId = exchange.getFromRouteId(); String source = LoggerHelper.getLineNumberLoggerName(node); @@ -172,7 +173,7 @@ public class BacklogTracer extends ServiceSupport implements org.apache.camel.sp DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( camelContext, first, last, incrementTraceCounter(), timestamp, source, fromRouteId, fromRouteId, toNode, toNodeParentId, null, null, toNodeShortName, toNodeLabel, level, - exchangeId, correlationExchangeId, false, false, data); + exchangeId, correlationExchangeId, breadcrumbId, false, false, data); if ((first || last) && fromRouteId != null) { Route route = camelContext.getRoute(fromRouteId); if (route != null && route.getConsumer() != null) { @@ -195,12 +196,22 @@ public class BacklogTracer extends ServiceSupport implements org.apache.camel.sp // handle capturing events for last full completed exchange (aka replay) if (camelContext.isMessageHistory()) { - String tid = null; var head = provisionalHistoryQueue.peek(); + String bid = null; + String tid = null; if (head != null) { + bid = head.getBreadcrumbId(); tid = head.getExchangeId(); } - if (tid == null || tid.equals(event.getExchangeId()) || tid.equals(event.getCorrelationExchangeId())) { + // correlate by breadcrumb ID when available (links exchanges across broker boundaries) + // fallback to exchange ID / correlation ID matching when breadcrumb is not set + boolean match; + if (bid != null && event.getBreadcrumbId() != null) { + match = bid.equals(event.getBreadcrumbId()); + } else { + match = tid == null || tid.equals(event.getExchangeId()) || tid.equals(event.getCorrelationExchangeId()); + } + if (match) { boolean added = provisionalHistoryQueue.offer(event); boolean original = head != null && event.getRouteId() != null && event.getRouteId().equals(head.getRouteId()); if (event.isLast() && original) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java index 7adf38f6b7c8..09d9aad0e903 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java @@ -896,6 +896,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back message.getToNodeParentWhenLabel(), message.getToNodeShortName(), message.getToNodeLabel(), message.getToNodeLevel(), message.getExchangeId(), message.getCorrelationExchangeId(), + message.getBreadcrumbId(), false, false, dumpAsJSonObject(suspendedExchange.getExchange()))); } @@ -939,6 +940,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = definition.getLevel(); long uid = debugCounter.incrementAndGet(); String source = LoggerHelper.getLineNumberLoggerName(definition); @@ -948,7 +950,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back = new DefaultBacklogTracerEventMessage( camelContext, first, false, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, null, null, - toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, + toNodeShortName, toNodeLabel, level, exchangeId, correlationExchangeId, breadcrumbId, false, false, data); suspendedBreakpointMessages.put(nodeId, msg); @@ -1030,6 +1032,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = definition.getLevel(); long uid = debugCounter.incrementAndGet(); String source = LoggerHelper.getLineNumberLoggerName(definition); @@ -1039,7 +1042,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back camelContext, false, false, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, null, null, toNodeShortName, toNodeLabel, level, - exchangeId, correlationExchangeId, + exchangeId, correlationExchangeId, breadcrumbId, false, false, data); suspendedBreakpointMessages.put(toNode, msg); @@ -1140,6 +1143,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back String routeId = route != null ? route.getRouteId() : toNode; String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = definition.getLevel(); long uid = debugCounter.incrementAndGet(); String source = LoggerHelper.getLineNumberLoggerName(route != null ? route : definition); @@ -1148,7 +1152,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back = new DefaultBacklogTracerEventMessage( camelContext, false, true, uid, timestamp, source, fromRouteId, routeId, toNode, toNodeParentId, - null, null, null, null, level, exchangeId, correlationExchangeId, + null, null, null, null, level, exchangeId, correlationExchangeId, breadcrumbId, false, false, data); // we want to capture if there was an exception if (cause != null) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java index 7e8b0edabc38..dc0dd0b8755f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java @@ -54,6 +54,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private final int toNodeLevel; private final String exchangeId; private final String correlationExchangeId; + private final String breadcrumbId; private final String threadName; private String endpointUri; private boolean remoteEndpoint; @@ -77,7 +78,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven String toNodeParentId, String toNodeParentWhenId, String toNodeParentWhenLabel, String toNodeShortName, String toNodeLabel, int toNodeLevel, String exchangeId, - String correlationExchangeId, + String correlationExchangeId, String breadcrumbId, boolean rest, boolean template, JsonObject data) { this.camelContext = camelContext; this.watch = new StopWatch(); @@ -97,6 +98,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven this.toNodeLevel = toNodeLevel; this.exchangeId = exchangeId; this.correlationExchangeId = correlationExchangeId; + this.breadcrumbId = breadcrumbId; this.rest = rest; this.template = template; this.threadName = Thread.currentThread().getName(); @@ -199,6 +201,11 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven return correlationExchangeId; } + @Override + public String getBreadcrumbId() { + return breadcrumbId; + } + @Override public String getProcessingThreadName() { return threadName; @@ -587,6 +594,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven if (correlationExchangeId != null) { jo.put("correlationExchangeId", correlationExchangeId); } + if (breadcrumbId != null) { + jo.put("breadcrumbId", breadcrumbId); + } if (timestamp > 0) { jo.put("timestamp", timestamp); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 66c56142faf0..530b8534c4d6 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -649,6 +649,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String source = LoggerHelper.getLineNumberLoggerName(input); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); String routeId = routeDefinition.getRouteId(); String fromRouteId = exchange.getFromRouteId(); int level = 1; @@ -666,7 +667,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In input.getId(), null, null, null, input.getShortName(), input.getLabel(), - level, exchangeId, correlationExchangeId, rest, template, data); + level, exchangeId, correlationExchangeId, breadcrumbId, rest, template, data); if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { first.setEndpointServiceUrl(esl.getServiceUrl()); first.setEndpointServiceProtocol(esl.getServiceProtocol()); @@ -687,6 +688,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String source = LoggerHelper.getLineNumberLoggerName(input); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); String routeId = routeDefinition.getRouteId(); String fromRouteId = exchange.getFromRouteId(); int level = 1; @@ -704,7 +706,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In input.getId(), null, null, null, input.getShortName(), input.getLabel(), - level, exchangeId, correlationExchangeId, rest, template, data); + level, exchangeId, correlationExchangeId, breadcrumbId, rest, template, data); if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { first.setEndpointServiceUrl(esl.getServiceUrl()); first.setEndpointServiceProtocol(esl.getServiceProtocol()); @@ -775,6 +777,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In public DefaultBacklogTracerEventMessage before(Exchange exchange) throws Exception { String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = processorDefinition.getLevel(); String routeId = ExchangeHelper.getAtRouteId(exchange); String fromRouteId = exchange.getFromRouteId(); @@ -794,7 +797,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In processorDefinition.getId(), null, null, null, processorDefinition.getShortName(), processorDefinition.getLabel(), - level + 1, exchangeId, correlationExchangeId, false, false, data); + level + 1, exchangeId, correlationExchangeId, breadcrumbId, false, false, data); backlogTracer.traceEvent(event); return event; } @@ -806,6 +809,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); String routeId = ExchangeHelper.getAtRouteId(exchange); String fromRouteId = exchange.getFromRouteId(); int level = 1; @@ -823,7 +827,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In processorDefinition.getId(), null, null, null, processorDefinition.getShortName(), processorDefinition.getLabel(), - level, exchangeId, correlationExchangeId, false, false, data); + level, exchangeId, correlationExchangeId, breadcrumbId, false, false, data); if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { first.setEndpointServiceUrl(esl.getServiceUrl()); first.setEndpointServiceProtocol(esl.getServiceProtocol()); @@ -931,6 +935,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String toNodeLabel = StringHelper.limitLength(processorDefinition.getLabel(), 50); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); int level = processorDefinition.getLevel(); boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); @@ -951,7 +956,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In toNodeParentId, toNodeParentWhenId, toNodeParentWhenLabel, toNodeShortName, toNodeLabel, level, - exchangeId, correlationExchangeId, rest, template, data); + exchangeId, correlationExchangeId, breadcrumbId, rest, template, data); backlogTracer.traceEvent(event); return event; } @@ -969,6 +974,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String fromRouteId = exchange.getFromRouteId(); String exchangeId = exchange.getExchangeId(); String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); long created = exchange.getClock().getCreated(); @@ -985,7 +991,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In false, true, backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, toNode, null, null, null, toNodeShortName, toNodeLabel, - level, exchangeId, correlationExchangeId, rest, template, data); + level, exchangeId, correlationExchangeId, breadcrumbId, rest, template, data); backlogTracer.traceEvent(pseudoLast); doneProcessing(exchange, pseudoLast); doneProcessing(exchange, pseudoFirst);
