This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch fix/CAMEL-23684
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 29c2cbe14acda1b085b74833729d0f1e7360c0f8
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 4 12:49:44 2026 +0200

    CAMEL-23684: BacklogTracer - correlate message history by breadcrumb ID
    
    Capture the breadcrumb ID (CamelBreadcrumbId header) in BacklogTracer
    event messages and use it to correlate message history across broker
    boundaries. When exchanges pass through Kafka, SEDA, JMS, etc., each
    consumer creates a new independent exchange - the breadcrumb ID links
    them together so the history tab shows the full end-to-end flow.
    
    Falls back to exchange ID / correlation ID matching when breadcrumb
    is not set (e.g. when allowUseOriginalMessage is disabled).
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
---
 .../apache/camel/spi/BacklogTracerEventMessage.java    |  8 ++++++++
 .../org/apache/camel/impl/debugger/BacklogTracer.java  | 17 ++++++++++++++---
 .../camel/impl/debugger/DefaultBacklogDebugger.java    | 10 +++++++---
 .../debugger/DefaultBacklogTracerEventMessage.java     | 12 +++++++++++-
 .../camel/impl/engine/CamelInternalProcessor.java      | 18 ++++++++++++------
 5 files changed, 52 insertions(+), 13 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 7956e8587e53..a87c08ec0b6a 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -129,6 +129,14 @@ public interface BacklogTracerEventMessage extends 
BacklogEventMessage {
     @Nullable
     String getCorrelationExchangeId();
 
+    /**
+     * The breadcrumb id that links exchanges across broker boundaries (Kafka, 
SEDA, JMS, etc.)
+     *
+     * @since 4.21
+     */
+    @Nullable
+    String getBreadcrumbId();
+
     /**
      * The name of the thread that is processing the message, when this event 
was captured.
      */
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 2bc0a9543535..d1a8b9559803 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -164,6 +164,7 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
         String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50);
         String exchangeId = exchange.getExchangeId();
         String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+        String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
         int level = node.getLevel();
         String fromRouteId = exchange.getFromRouteId();
         String source = LoggerHelper.getLineNumberLoggerName(node);
@@ -172,7 +173,7 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
         DefaultBacklogTracerEventMessage event = new 
DefaultBacklogTracerEventMessage(
                 camelContext, first, last, incrementTraceCounter(), timestamp, 
source, fromRouteId, fromRouteId, toNode,
                 toNodeParentId, null, null, toNodeShortName, toNodeLabel, 
level,
-                exchangeId, correlationExchangeId, false, false, data);
+                exchangeId, correlationExchangeId, breadcrumbId, false, false, 
data);
         if ((first || last) && fromRouteId != null) {
             Route route = camelContext.getRoute(fromRouteId);
             if (route != null && route.getConsumer() != null) {
@@ -195,12 +196,22 @@ public class BacklogTracer extends ServiceSupport 
implements org.apache.camel.sp
 
         // handle capturing events for last full completed exchange (aka 
replay)
         if (camelContext.isMessageHistory()) {
-            String tid = null;
             var head = provisionalHistoryQueue.peek();
+            String bid = null;
+            String tid = null;
             if (head != null) {
+                bid = head.getBreadcrumbId();
                 tid = head.getExchangeId();
             }
-            if (tid == null || tid.equals(event.getExchangeId()) || 
tid.equals(event.getCorrelationExchangeId())) {
+            // correlate by breadcrumb ID when available (links exchanges 
across broker boundaries)
+            // fallback to exchange ID / correlation ID matching when 
breadcrumb is not set
+            boolean match;
+            if (bid != null && event.getBreadcrumbId() != null) {
+                match = bid.equals(event.getBreadcrumbId());
+            } else {
+                match = tid == null || tid.equals(event.getExchangeId()) || 
tid.equals(event.getCorrelationExchangeId());
+            }
+            if (match) {
                 boolean added = provisionalHistoryQueue.offer(event);
                 boolean original = head != null && event.getRouteId() != null 
&& event.getRouteId().equals(head.getRouteId());
                 if (event.isLast() && original) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index 7adf38f6b7c8..09d9aad0e903 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -896,6 +896,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                         message.getToNodeParentWhenLabel(),
                         message.getToNodeShortName(), message.getToNodeLabel(),
                         message.getToNodeLevel(), message.getExchangeId(), 
message.getCorrelationExchangeId(),
+                        message.getBreadcrumbId(),
                         false, false,
                         dumpAsJSonObject(suspendedExchange.getExchange())));
     }
@@ -939,6 +940,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = CamelContextHelper.getRouteId(definition);
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -948,7 +950,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                     = new DefaultBacklogTracerEventMessage(
                             camelContext,
                             first, false, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId, null, null,
-                            toNodeShortName, toNodeLabel, level, exchangeId, 
correlationExchangeId,
+                            toNodeShortName, toNodeLabel, level, exchangeId, 
correlationExchangeId, breadcrumbId,
                             false, false, data);
             suspendedBreakpointMessages.put(nodeId, msg);
 
@@ -1030,6 +1032,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = CamelContextHelper.getRouteId(definition);
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(definition);
@@ -1039,7 +1042,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                             camelContext,
                             false, false, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId, null, null,
                             toNodeShortName, toNodeLabel, level,
-                            exchangeId, correlationExchangeId,
+                            exchangeId, correlationExchangeId, breadcrumbId,
                             false, false, data);
             suspendedBreakpointMessages.put(toNode, msg);
 
@@ -1140,6 +1143,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
             String routeId = route != null ? route.getRouteId() : toNode;
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = definition.getLevel();
             long uid = debugCounter.incrementAndGet();
             String source = LoggerHelper.getLineNumberLoggerName(route != null 
? route : definition);
@@ -1148,7 +1152,7 @@ public final class DefaultBacklogDebugger extends 
ServiceSupport implements Back
                     = new DefaultBacklogTracerEventMessage(
                             camelContext,
                             false, true, uid, timestamp, source, fromRouteId, 
routeId, toNode, toNodeParentId,
-                            null, null, null, null, level, exchangeId, 
correlationExchangeId,
+                            null, null, null, null, level, exchangeId, 
correlationExchangeId, breadcrumbId,
                             false, false, data);
             // we want to capture if there was an exception
             if (cause != null) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index 7e8b0edabc38..dc0dd0b8755f 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -54,6 +54,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
     private final int toNodeLevel;
     private final String exchangeId;
     private final String correlationExchangeId;
+    private final String breadcrumbId;
     private final String threadName;
     private String endpointUri;
     private boolean remoteEndpoint;
@@ -77,7 +78,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
                                             String toNodeParentId,
                                             String toNodeParentWhenId, String 
toNodeParentWhenLabel,
                                             String toNodeShortName, String 
toNodeLabel, int toNodeLevel, String exchangeId,
-                                            String correlationExchangeId,
+                                            String correlationExchangeId, 
String breadcrumbId,
                                             boolean rest, boolean template, 
JsonObject data) {
         this.camelContext = camelContext;
         this.watch = new StopWatch();
@@ -97,6 +98,7 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         this.toNodeLevel = toNodeLevel;
         this.exchangeId = exchangeId;
         this.correlationExchangeId = correlationExchangeId;
+        this.breadcrumbId = breadcrumbId;
         this.rest = rest;
         this.template = template;
         this.threadName = Thread.currentThread().getName();
@@ -199,6 +201,11 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         return correlationExchangeId;
     }
 
+    @Override
+    public String getBreadcrumbId() {
+        return breadcrumbId;
+    }
+
     @Override
     public String getProcessingThreadName() {
         return threadName;
@@ -587,6 +594,9 @@ public final class DefaultBacklogTracerEventMessage 
implements BacklogTracerEven
         if (correlationExchangeId != null) {
             jo.put("correlationExchangeId", correlationExchangeId);
         }
+        if (breadcrumbId != null) {
+            jo.put("breadcrumbId", breadcrumbId);
+        }
         if (timestamp > 0) {
             jo.put("timestamp", timestamp);
         }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 66c56142faf0..530b8534c4d6 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -649,6 +649,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = LoggerHelper.getLineNumberLoggerName(input);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = routeDefinition.getRouteId();
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -666,7 +667,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         input.getId(),
                         null, null, null,
                         input.getShortName(), input.getLabel(),
-                        level, exchangeId, correlationExchangeId, rest, 
template, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -687,6 +688,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = LoggerHelper.getLineNumberLoggerName(input);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = routeDefinition.getRouteId();
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -704,7 +706,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         input.getId(),
                         null, null, null,
                         input.getShortName(), input.getLabel(),
-                        level, exchangeId, correlationExchangeId, rest, 
template, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -775,6 +777,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
         public DefaultBacklogTracerEventMessage before(Exchange exchange) 
throws Exception {
             String exchangeId = exchange.getExchangeId();
             String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
             int level = processorDefinition.getLevel();
             String routeId = ExchangeHelper.getAtRouteId(exchange);
             String fromRouteId = exchange.getFromRouteId();
@@ -794,7 +797,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                     processorDefinition.getId(),
                     null, null, null,
                     processorDefinition.getShortName(), 
processorDefinition.getLabel(),
-                    level + 1, exchangeId, correlationExchangeId, false, 
false, data);
+                    level + 1, exchangeId, correlationExchangeId, 
breadcrumbId, false, false, data);
             backlogTracer.traceEvent(event);
             return event;
         }
@@ -806,6 +809,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String source = 
LoggerHelper.getLineNumberLoggerName(processorDefinition);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 String routeId = ExchangeHelper.getAtRouteId(exchange);
                 String fromRouteId = exchange.getFromRouteId();
                 int level = 1;
@@ -823,7 +827,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         processorDefinition.getId(),
                         null, null, null,
                         processorDefinition.getShortName(), 
processorDefinition.getLabel(),
-                        level, exchangeId, correlationExchangeId, false, 
false, data);
+                        level, exchangeId, correlationExchangeId, 
breadcrumbId, false, false, data);
                 if (exchange.getFromEndpoint() instanceof 
EndpointServiceLocation esl) {
                     first.setEndpointServiceUrl(esl.getServiceUrl());
                     first.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -931,6 +935,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 String toNodeLabel = 
StringHelper.limitLength(processorDefinition.getLabel(), 50);
                 String exchangeId = exchange.getExchangeId();
                 String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                 int level = processorDefinition.getLevel();
 
                 boolean includeExchangeProperties = 
backlogTracer.isIncludeExchangeProperties();
@@ -951,7 +956,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                         toNodeParentId,
                         toNodeParentWhenId, toNodeParentWhenLabel,
                         toNodeShortName, toNodeLabel, level,
-                        exchangeId, correlationExchangeId, rest, template, 
data);
+                        exchangeId, correlationExchangeId, breadcrumbId, rest, 
template, data);
                 backlogTracer.traceEvent(event);
                 return event;
             }
@@ -969,6 +974,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                     String fromRouteId = exchange.getFromRouteId();
                     String exchangeId = exchange.getExchangeId();
                     String correlationExchangeId = 
exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
+                    String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
                     boolean includeExchangeProperties = 
backlogTracer.isIncludeExchangeProperties();
                     boolean includeExchangeVariables = 
backlogTracer.isIncludeExchangeVariables();
                     long created = exchange.getClock().getCreated();
@@ -985,7 +991,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                             false, true, 
backlogTracer.incrementTraceCounter(), created, source, fromRouteId, routeId, 
toNode,
                             null, null,
                             null, toNodeShortName, toNodeLabel,
-                            level, exchangeId, correlationExchangeId, rest, 
template, data);
+                            level, exchangeId, correlationExchangeId, 
breadcrumbId, rest, template, data);
                     backlogTracer.traceEvent(pseudoLast);
                     doneProcessing(exchange, pseudoLast);
                     doneProcessing(exchange, pseudoFirst);

Reply via email to