This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f440ff69cee4 CAMEL-22687: camel-jbang - get history to show latest
completed exchange message history. (#19937)
f440ff69cee4 is described below
commit f440ff69cee4e0cd9c5353857720aa204bb3b87a
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Nov 17 13:21:17 2025 +0100
CAMEL-22687: camel-jbang - get history to show latest completed exchange
message history. (#19937)
---
.../apache/camel/catalog/dev-consoles.properties | 1 +
.../catalog/dev-consoles/message-history.json | 15 ++
.../java/org/apache/camel/spi/BacklogTracer.java | 26 ++
.../camel/spi/BacklogTracerEventMessage.java | 10 +
.../apache/camel/impl/debugger/BacklogTracer.java | 80 +++++-
.../impl/debugger/DefaultBacklogDebugger.java | 17 +-
.../debugger/DefaultBacklogTracerEventMessage.java | 25 +-
.../camel/impl/engine/CamelInternalProcessor.java | 14 +-
.../MessageHistoryDevConsoleConfigurer.java | 63 +++++
.../apache/camel/dev-console/message-history.json | 15 ++
...che.camel.impl.console.MessageHistoryDevConsole | 2 +
.../org/apache/camel/dev-console/message-history | 2 +
.../org/apache/camel/dev-consoles.properties | 2 +-
.../impl/console/MessageHistoryDevConsole.java | 72 ++++++
.../mbean/ManagedBacklogTracerMBean.java | 9 +
.../management/mbean/ManagedBacklogTracer.java | 15 ++
.../BacklogTracerMessageHistoryTest.java | 138 ++++++++++
.../camel/cli/connector/LocalCliConnector.java | 20 ++
.../dsl/jbang/core/commands/CamelCommand.java | 4 +
.../dsl/jbang/core/commands/CamelJBangMain.java | 1 +
.../core/commands/action/CamelHistoryAction.java | 281 +++++++++++++++++++++
21 files changed, 794 insertions(+), 18 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
index 9b787ecb27df..482964cf14ff 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
@@ -28,6 +28,7 @@ log
main-configuration
main-http-server
memory
+message-history
micrometer
platform-http
processor
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json
new file mode 100644
index 000000000000..f3c6d76d2c3b
--- /dev/null
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "message-history",
+ "title": "Message History",
+ "description": "History of latest completed exchange",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.MessageHistoryDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.17.0-SNAPSHOT"
+ }
+}
+
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java
index 633c98fbb25b..95f46b0679b4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.spi;
+import java.util.Collection;
import java.util.List;
/**
@@ -184,11 +185,26 @@ public interface BacklogTracer {
*/
void resetTraceCounter();
+ /**
+ * Get all tracing data (without removing)
+ */
+ Collection<BacklogTracerEventMessage> getAllTracedMessages();
+
+ /**
+ * Get latest completed exchange message history (without removing)
+ */
+ Collection<BacklogTracerEventMessage> getLatestMessageHistory();
+
/**
* Dumps all tracing data
*/
List<BacklogTracerEventMessage> dumpAllTracedMessages();
+ /**
+ * Dumps latest completed exchange message history
+ */
+ List<BacklogTracerEventMessage> dumpLatestMessageHistory();
+
/**
* Dumps tracing data for the given route id / node id
*/
@@ -204,6 +220,11 @@ public interface BacklogTracer {
*/
String dumpTracedMessagesAsXml(String nodeId);
+ /**
+ * Dumps latest completed exchange message history as XML
+ */
+ String dumpLatestMessageHistoryAsXml();
+
/**
* Dumps all tracing data as JSon
*/
@@ -214,6 +235,11 @@ public interface BacklogTracer {
*/
String dumpTracedMessagesAsJSon(String nodeId);
+ /**
+ * Dumps latest completed exchange message history as JSon
+ */
+ String dumpLatestMessageHistoryAsJSon();
+
/**
* Clears the backlog of traced messages.
*/
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
index 7248f736aa00..6a632d43efb0 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java
@@ -73,6 +73,16 @@ public interface BacklogTracerEventMessage {
*/
String getToNode();
+ /**
+ * Node short name where the message is being routed to
+ */
+ String getToNodeShortName();
+
+ /**
+ * Node label where the message is being routed to
+ */
+ String getToNodeLabel();
+
/**
* The exchange id
*/
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
index 011df4b71cf7..04815bfe3f4d 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java
@@ -17,6 +17,8 @@
package org.apache.camel.impl.debugger;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
@@ -43,7 +45,7 @@ import org.apache.camel.util.json.Jsoner;
* This tracer allows to store message tracers per node in the Camel routes.
The tracers is stored in a backlog queue
* (FIFO based) which allows to pull the traced messages on demand.
*/
-public final class BacklogTracer extends ServiceSupport implements
org.apache.camel.spi.BacklogTracer {
+public class BacklogTracer extends ServiceSupport implements
org.apache.camel.spi.BacklogTracer {
// limit the tracer to a thousand messages in total
public static final int MAX_BACKLOG_SIZE = 1000;
@@ -56,6 +58,9 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
private final Queue<BacklogTracerEventMessage> queue = new
LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
// how many of the last messages to keep in the backlog at total
private int backlogSize = 100;
+ // use tracer to capture additional information for capturing latest
completed exchange message-history
+ private final Queue<BacklogTracerEventMessage> provisionalHistoryQueue =
new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
+ private final Queue<BacklogTracerEventMessage> completeHistoryQueue = new
LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
private boolean removeOnDump = true;
private int bodyMaxChars = 32 * 1024;
private boolean bodyIncludeStreams;
@@ -71,7 +76,7 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
private String traceFilter;
private Predicate predicate;
- private BacklogTracer(CamelContext camelContext) {
+ BacklogTracer(CamelContext camelContext) {
this.camelContext = camelContext;
this.simple = camelContext.resolveLanguage("simple");
}
@@ -94,7 +99,10 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
* @return <tt>true</tt> to trace, <tt>false</tt> to skip
tracing
*/
public boolean shouldTrace(NamedNode definition, Exchange exchange) {
- if (!enabled) {
+ // special in standby mode we allow using tracer to capture latest
tracing data for
+ // enriched message history
+ boolean history = (enabled || standby) &&
camelContext.isMessageHistory();
+ if (!history && !enabled) {
return false;
}
@@ -131,6 +139,29 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
}
public void traceEvent(DefaultBacklogTracerEventMessage event) {
+ // special in standby mode we allow using tracer to capture latest
tracing data for
+ // enriched message history
+ boolean history = (enabled || standby) &&
camelContext.isMessageHistory();
+ if (!history && !enabled) {
+ return;
+ }
+
+ // handle capturing events for last full completed exchange (aka
replay)
+ if (camelContext.isMessageHistory()) {
+ String tid = null;
+ var head = provisionalHistoryQueue.peek();
+ if (head != null) {
+ tid = head.getExchangeId();
+ }
+ if (tid == null || tid.equals(event.getExchangeId())) {
+ provisionalHistoryQueue.add(event);
+ if (event.isLast()) {
+ completeHistoryQueue.clear();
+ completeHistoryQueue.addAll(provisionalHistoryQueue);
+ provisionalHistoryQueue.clear();
+ }
+ }
+ }
if (!enabled) {
return;
}
@@ -340,6 +371,15 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
traceCounter.set(0);
}
+ @Override
+ public Collection<BacklogTracerEventMessage> getAllTracedMessages() {
+ return Collections.unmodifiableCollection(queue);
+ }
+
+ public Collection<BacklogTracerEventMessage> getLatestMessageHistory() {
+ return Collections.unmodifiableCollection(completeHistoryQueue);
+ }
+
public List<BacklogTracerEventMessage> dumpTracedMessages(String nodeId) {
List<BacklogTracerEventMessage> answer = new ArrayList<>();
if (nodeId != null) {
@@ -350,7 +390,7 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
}
}
- if (removeOnDump) {
+ if (isRemoveOnDump()) {
queue.removeAll(answer);
}
@@ -360,7 +400,12 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
@Override
public String dumpTracedMessagesAsXml(String nodeId) {
List<BacklogTracerEventMessage> events = dumpTracedMessages(nodeId);
+ return wrapAroundRootTag(events);
+ }
+ @Override
+ public String dumpLatestMessageHistoryAsXml() {
+ List<BacklogTracerEventMessage> events = dumpLatestMessageHistory();
return wrapAroundRootTag(events);
}
@@ -377,6 +422,19 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
return Jsoner.prettyPrint(root.toJson());
}
+ @Override
+ public String dumpLatestMessageHistoryAsJSon() {
+ List<BacklogTracerEventMessage> events = dumpLatestMessageHistory();
+
+ JsonObject root = new JsonObject();
+ JsonArray arr = new JsonArray();
+ root.put("traces", arr);
+ for (BacklogTracerEventMessage event : events) {
+ arr.add(event.asJSon());
+ }
+ return Jsoner.prettyPrint(root.toJson());
+ }
+
@Override
public List<BacklogTracerEventMessage> dumpAllTracedMessages() {
List<BacklogTracerEventMessage> answer = new ArrayList<>(queue);
@@ -386,10 +444,18 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
return answer;
}
+ @Override
+ public List<BacklogTracerEventMessage> dumpLatestMessageHistory() {
+ List<BacklogTracerEventMessage> answer = new
ArrayList<>(completeHistoryQueue);
+ if (isRemoveOnDump()) {
+ completeHistoryQueue.clear();
+ }
+ return answer;
+ }
+
@Override
public String dumpAllTracedMessagesAsXml() {
List<BacklogTracerEventMessage> events = dumpAllTracedMessages();
-
return wrapAroundRootTag(events);
}
@@ -419,6 +485,8 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
@Override
public void clear() {
queue.clear();
+ completeHistoryQueue.clear();
+ provisionalHistoryQueue.clear();
}
public long incrementTraceCounter() {
@@ -427,7 +495,7 @@ public final class BacklogTracer extends ServiceSupport
implements org.apache.ca
@Override
protected void doStop() throws Exception {
- queue.clear();
+ clear();
}
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
index e3ec7c9109c3..090040ed9137 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java
@@ -58,6 +58,7 @@ import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
import org.apache.camel.util.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -884,7 +885,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
(nId, message) -> new DefaultBacklogTracerEventMessage(
camelContext,
false, false, message.getUid(),
message.getTimestamp(), message.getLocation(), message.getRouteId(),
- message.getToNode(),
+ message.getToNode(), message.getToNodeShortName(),
message.getToNodeLabel(),
message.getExchangeId(),
false, false,
dumpAsJSonObject(suspendedExchange.getExchange())));
@@ -921,6 +922,9 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
// store a copy of the message so we can see that from the debugger
long timestamp = System.currentTimeMillis();
String toNode = definition.getId();
+ String toNodeShortName = definition.getShortName();
+ // avoid label is too large
+ String toNodeLabel =
StringHelper.limitLength(definition.getLabel(), 50);
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
long uid = debugCounter.incrementAndGet();
@@ -930,7 +934,8 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- first, false, uid, timestamp, source, routeId,
toNode, exchangeId, false, false, data);
+ first, false, uid, timestamp, source, routeId,
toNode, toNodeShortName, toNodeLabel, exchangeId,
+ false, false, data);
suspendedBreakpointMessages.put(nodeId, msg);
// suspend at this breakpoint
@@ -1003,6 +1008,9 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
// store a copy of the message so we can see that from the debugger
long timestamp = System.currentTimeMillis();
String toNode = definition.getId();
+ String toNodeShortName = definition.getShortName();
+ // avoid label is too large
+ String toNodeLabel =
StringHelper.limitLength(definition.getLabel(), 50);
String routeId = CamelContextHelper.getRouteId(definition);
String exchangeId = exchange.getExchangeId();
long uid = debugCounter.incrementAndGet();
@@ -1011,7 +1019,8 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- false, false, uid, timestamp, source, routeId,
toNode, exchangeId, false, false, data);
+ false, false, uid, timestamp, source, routeId,
toNode, toNodeShortName, toNodeLabel, exchangeId,
+ false, false, data);
suspendedBreakpointMessages.put(toNode, msg);
// suspend at this breakpoint
@@ -1114,7 +1123,7 @@ public final class DefaultBacklogDebugger extends
ServiceSupport implements Back
BacklogTracerEventMessage msg
= new DefaultBacklogTracerEventMessage(
camelContext,
- false, true, uid, timestamp, source, routeId,
toNode, exchangeId, false, false, data);
+ false, true, uid, timestamp, source, routeId,
toNode, null, null, exchangeId, false, false, data);
// we want to capture if there was an exception
if (cause != null) {
msg.setException(cause);
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
index 0288b68f1a7c..01caaeedb11d 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java
@@ -45,6 +45,8 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
private final String location;
private final String routeId;
private final String toNode;
+ private final String toNodeShortName;
+ private final String toNodeLabel;
private final String exchangeId;
private final String threadName;
private String endpointUri;
@@ -65,8 +67,9 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
private boolean done;
public DefaultBacklogTracerEventMessage(CamelContext camelContext, boolean
first, boolean last, long uid, long timestamp,
- String location, String routeId,
String toNode, String exchangeId,
- boolean rest, boolean template,
JsonObject data) {
+ String location, String routeId,
String toNode, String toNodeShortName,
+ String toNodeLabel,
+ String exchangeId, boolean rest,
boolean template, JsonObject data) {
this.camelContext = camelContext;
this.watch = new StopWatch();
this.first = first;
@@ -76,6 +79,8 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
this.location = location;
this.routeId = routeId;
this.toNode = toNode;
+ this.toNodeShortName = toNodeShortName;
+ this.toNodeLabel = toNodeLabel;
this.exchangeId = exchangeId;
this.rest = rest;
this.template = template;
@@ -136,6 +141,16 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
return toNode;
}
+ @Override
+ public String getToNodeShortName() {
+ return toNodeShortName;
+ }
+
+ @Override
+ public String getToNodeLabel() {
+ return toNodeLabel;
+ }
+
@Override
public String getExchangeId() {
return exchangeId;
@@ -503,6 +518,12 @@ public final class DefaultBacklogTracerEventMessage
implements BacklogTracerEven
if (toNode != null) {
jo.put("nodeId", toNode);
}
+ if (toNodeShortName != null) {
+ jo.put("nodeShortName", toNodeShortName);
+ }
+ if (toNodeLabel != null) {
+ jo.put("nodeLabel", toNodeLabel);
+ }
if (exchangeId != null) {
jo.put("exchangeId", exchangeId);
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index a05df7e59b68..0dede3489adc 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -77,6 +77,7 @@ import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
import org.apache.camel.util.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -650,6 +651,8 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
long timestamp = System.currentTimeMillis();
String toNode = processorDefinition.getId();
+ String toNodeShortName = processorDefinition.getShortName();
+ String toNodeLabel =
StringHelper.limitLength(processorDefinition.getLabel(), 50);
String exchangeId = exchange.getExchangeId();
boolean includeExchangeProperties =
backlogTracer.isIncludeExchangeProperties();
boolean includeExchangeVariables =
backlogTracer.isIncludeExchangeVariables();
@@ -666,8 +669,8 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
final long created = exchange.getClock().getCreated();
DefaultBacklogTracerEventMessage pseudoFirst = new
DefaultBacklogTracerEventMessage(
camelContext,
- true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, null,
exchangeId,
- rest, template, data);
+ true, false,
backlogTracer.incrementTraceCounter(), created, source, routeId, null, null,
null,
+ exchangeId, rest, template, data);
if (exchange.getFromEndpoint() instanceof
EndpointServiceLocation esl) {
pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl());
pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol());
@@ -679,8 +682,9 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
String source =
LoggerHelper.getLineNumberLoggerName(processorDefinition);
DefaultBacklogTracerEventMessage event = new
DefaultBacklogTracerEventMessage(
camelContext,
- false, false, backlogTracer.incrementTraceCounter(),
timestamp, source, routeId, toNode, exchangeId,
- rest, template, data);
+ false, false, backlogTracer.incrementTraceCounter(),
timestamp, source, routeId, toNode,
+ toNodeShortName, toNodeLabel,
+ exchangeId, rest, template, data);
backlogTracer.traceEvent(event);
return event;
@@ -705,7 +709,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
backlogTracer.getBodyMaxChars());
DefaultBacklogTracerEventMessage pseudoLast = new
DefaultBacklogTracerEventMessage(
camelContext,
- false, true,
backlogTracer.incrementTraceCounter(), created, source, routeId, null,
+ false, true,
backlogTracer.incrementTraceCounter(), created, source, routeId, null, null,
null,
exchangeId, rest, template, data);
backlogTracer.traceEvent(pseudoLast);
doneProcessing(exchange, pseudoLast);
diff --git
a/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java
new file mode 100644
index 000000000000..ba2897aeebd0
--- /dev/null
+++
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java
@@ -0,0 +1,63 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.impl.console;
+
+import javax.annotation.processing.Generated;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.impl.console.MessageHistoryDevConsole;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo")
+@SuppressWarnings("unchecked")
+public class MessageHistoryDevConsoleConfigurer extends
org.apache.camel.support.component.PropertyConfigurerSupport implements
GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter {
+
+ private static final Map<String, Object> ALL_OPTIONS;
+ static {
+ Map<String, Object> map = new CaseInsensitiveMap();
+ map.put("CamelContext", org.apache.camel.CamelContext.class);
+ ALL_OPTIONS = map;
+ }
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
+ org.apache.camel.impl.console.MessageHistoryDevConsole target =
(org.apache.camel.impl.console.MessageHistoryDevConsole) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "camelcontext":
+ case "camelContext": target.setCamelContext(property(camelContext,
org.apache.camel.CamelContext.class, value)); return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public Map<String, Object> getAllOptions(Object target) {
+ return ALL_OPTIONS;
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "camelcontext":
+ case "camelContext": return org.apache.camel.CamelContext.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ org.apache.camel.impl.console.MessageHistoryDevConsole target =
(org.apache.camel.impl.console.MessageHistoryDevConsole) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "camelcontext":
+ case "camelContext": return target.getCamelContext();
+ default: return null;
+ }
+ }
+}
+
diff --git
a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json
new file mode 100644
index 000000000000..f3c6d76d2c3b
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "message-history",
+ "title": "Message History",
+ "description": "History of latest completed exchange",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.MessageHistoryDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.17.0-SNAPSHOT"
+ }
+}
+
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole
new file mode 100644
index 000000000000..14a46fe27c2e
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.MessageHistoryDevConsoleConfigurer
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history
new file mode 100644
index 000000000000..9aa3bbe57b77
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.MessageHistoryDevConsole
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
index 9c04f6ddf671..4465a6069c9d 100644
---
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -1,5 +1,5 @@
# Generated by camel build tools - do NOT edit this file!
-dev-consoles=bean blocked browse circuit-breaker consumer context debug
endpoint event gc health inflight internal-tasks java-security jvm log memory
processor producer properties receive reload rest route route-controller
route-dump route-group route-structure send service source startup-recorder
system-properties thread top trace transformers type-converters variables
+dev-consoles=bean blocked browse circuit-breaker consumer context debug
endpoint event gc health inflight internal-tasks java-security jvm log memory
message-history processor producer properties receive reload rest route
route-controller route-dump route-group route-structure send service source
startup-recorder system-properties thread top trace transformers
type-converters variables
groupId=org.apache.camel
artifactId=camel-console
version=4.17.0-SNAPSHOT
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java
new file mode 100644
index 000000000000..f2d0c6656190
--- /dev/null
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.camel.spi.BacklogTracer;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.apache.camel.spi.Configurer;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+
+@DevConsole(name = "message-history", displayName = "Message History",
description = "History of latest completed exchange")
+@Configurer(extended = true)
+public class MessageHistoryDevConsole extends AbstractDevConsole {
+
+ public MessageHistoryDevConsole() {
+ super("camel", "message-history", "Message History", "History of
latest completed exchange");
+ }
+
+ protected String doCallText(Map<String, Object> options) {
+ StringBuilder sb = new StringBuilder();
+
+ BacklogTracer tracer =
getCamelContext().getCamelContextExtension().getContextPlugin(BacklogTracer.class);
+ if (tracer != null) {
+ Collection<BacklogTracerEventMessage> queue =
tracer.getLatestMessageHistory();
+ for (BacklogTracerEventMessage t : queue) {
+ String json = t.toJSon(0);
+ sb.append(json).append("\n");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ protected JsonObject doCallJson(Map<String, Object> options) {
+ JsonObject root = new JsonObject();
+
+ BacklogTracer tracer =
getCamelContext().getCamelContextExtension().getContextPlugin(BacklogTracer.class);
+ if (tracer != null) {
+ JsonArray arr = new JsonArray();
+
+ Collection<BacklogTracerEventMessage> queue =
tracer.getLatestMessageHistory();
+ for (BacklogTracerEventMessage t : queue) {
+ JsonObject jo = (JsonObject) t.asJSon();
+ arr.add(jo);
+ }
+ root.put("name", getCamelContext().getName());
+ root.put("traces", arr);
+ }
+
+ return root;
+ }
+
+}
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java
index 3ea2086f0ff1..d254c9a27c6f 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java
@@ -120,12 +120,21 @@ public interface ManagedBacklogTracerMBean {
@ManagedOperation(description = "Dumps all the traced messages")
List<BacklogTracerEventMessage> dumpAllTracedMessages();
+ @ManagedOperation(description = "Dumps latest completed exchange message
history")
+ List<BacklogTracerEventMessage> dumpLatestMessageHistory();
+
@ManagedOperation(description = "Dumps all the traced messages in XML
format")
String dumpAllTracedMessagesAsXml();
@ManagedOperation(description = "Dumps all the traced messages in JSon
format")
String dumpAllTracedMessagesAsJSon();
+ @ManagedOperation(description = "Dumps latest completed exchange message
history in XML format")
+ String dumpLatestMessageHistoryAsXml();
+
+ @ManagedOperation(description = "Dumps latest completed exchange message
history in JSon format")
+ String dumpLatestMessageHistoryAsJSon();
+
@ManagedOperation(description = "Clears the backlog")
void clear();
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java
index 5c31cc3c0cbc..098d954ec52c 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java
@@ -201,6 +201,11 @@ public class ManagedBacklogTracer implements
ManagedBacklogTracerMBean {
return backlogTracer.dumpAllTracedMessages();
}
+ @Override
+ public List<BacklogTracerEventMessage> dumpLatestMessageHistory() {
+ return backlogTracer.dumpLatestMessageHistory();
+ }
+
@Override
public String dumpTracedMessagesAsXml(String nodeOrRouteId) {
return backlogTracer.dumpTracedMessagesAsXml(nodeOrRouteId);
@@ -221,6 +226,16 @@ public class ManagedBacklogTracer implements
ManagedBacklogTracerMBean {
return backlogTracer.dumpAllTracedMessagesAsJSon();
}
+ @Override
+ public String dumpLatestMessageHistoryAsXml() {
+ return backlogTracer.dumpLatestMessageHistoryAsXml();
+ }
+
+ @Override
+ public String dumpLatestMessageHistoryAsJSon() {
+ return backlogTracer.dumpLatestMessageHistoryAsJSon();
+ }
+
@Override
public void clear() {
backlogTracer.clear();
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
new file mode 100644
index 000000000000..ea80f0620e59
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.List;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.BacklogTracerEventMessage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisabledOnOs(OS.AIX)
+public class BacklogTracerMessageHistoryTest extends ManagementTestSupport {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBacklogTracerReplay() throws Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on
+ = new ObjectName("org.apache.camel:context=" +
context.getManagementName() + ",type=tracer,name=BacklogTracer");
+ assertNotNull(on);
+ assertTrue(mbeanServer.isRegistered(on));
+
+ Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+ assertEquals(Boolean.TRUE, enabled, "Should be enabled");
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize");
+ assertEquals(100, size.intValue(), "Should be 100");
+
+ Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on,
"RemoveOnDump");
+ assertEquals(Boolean.TRUE, removeOnDump);
+
+ getMockEndpoint("mock:foo").expectedMessageCount(2);
+ getMockEndpoint("mock:bar").expectedMessageCount(2);
+
+ template.sendBody("direct:start", "Hello World");
+ template.sendBody("direct:start", "Bye World");
+
+ assertMockEndpointsSatisfied();
+
+ List<Exchange> exchanges =
getMockEndpoint("mock:foo").getReceivedExchanges();
+
+ List<BacklogTracerEventMessage> events =
(List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages",
+ new Object[] { "foo" }, new String[] { "java.lang.String" });
+
+ assertNotNull(events);
+ assertEquals(2, events.size());
+
+ BacklogTracerEventMessage event1 = events.get(0);
+ assertEquals("foo", event1.getToNode());
+ assertEquals(" <message exchangeId=\"" +
exchanges.get(0).getExchangeId()
+ + "\" exchangePattern=\"InOnly\"
exchangeType=\"org.apache.camel.support.DefaultExchange\"
messageType=\"org.apache.camel.support.DefaultMessage\">\n"
+ + " <exchangeProperties>\n"
+ + " <exchangeProperty key=\"CamelToEndpoint\"
type=\"java.lang.String\">direct://start</exchangeProperty>\n"
+ + " </exchangeProperties>\n"
+ + " <body type=\"java.lang.String\">Hello
World</body>\n"
+ + " </message>",
+ event1.getMessageAsXml());
+
+ BacklogTracerEventMessage event2 = events.get(1);
+ assertEquals("foo", event2.getToNode());
+ assertEquals(" <message exchangeId=\"" +
exchanges.get(1).getExchangeId()
+ + "\" exchangePattern=\"InOnly\"
exchangeType=\"org.apache.camel.support.DefaultExchange\"
messageType=\"org.apache.camel.support.DefaultMessage\">\n"
+ + " <exchangeProperties>\n"
+ + " <exchangeProperty key=\"CamelToEndpoint\"
type=\"java.lang.String\">direct://start</exchangeProperty>\n"
+ + " </exchangeProperties>\n"
+ + " <body type=\"java.lang.String\">Bye
World</body>\n"
+ + " </message>",
+ event2.getMessageAsXml());
+
+ events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on,
"dumpLatestMessageHistory", null, null);
+ assertNotNull(events);
+ assertEquals(4, events.size());
+
+ assertTrue(events.get(0).isFirst());
+ assertEquals("direct://start", events.get(0).getEndpointUri());
+ assertNull(events.get(0).getToNode());
+
+ assertFalse(events.get(1).isFirst());
+ assertFalse(events.get(1).isLast());
+ assertEquals("foo", events.get(1).getToNode());
+ assertEquals("to", events.get(1).getToNodeShortName());
+ assertEquals("to[mock:foo]", events.get(1).getToNodeLabel());
+
+ assertFalse(events.get(2).isFirst());
+ assertFalse(events.get(2).isLast());
+ assertEquals("bar", events.get(2).getToNode());
+ assertEquals("to", events.get(2).getToNodeShortName());
+ assertEquals("to[mock:bar]", events.get(2).getToNodeLabel());
+
+ assertTrue(events.get(3).isLast());
+ assertEquals("direct://start", events.get(3).getEndpointUri());
+ assertNull(events.get(3).getToNode());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ context.setUseBreadcrumb(false);
+ context.setBacklogTracing(true);
+ context.setMessageHistory(true);
+
+ from("direct:start")
+ .to("mock:foo").id("foo")
+ .to("mock:bar").id("bar");
+
+ }
+ };
+ }
+
+}
diff --git
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index d4ff037ac3bb..0e43279af66d 100644
---
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -117,6 +117,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
private File outputFile;
private File traceFile;
private long traceFilePos; // keep track of trace offset
+ private File messageHistoryFile;
private File debugFile;
private File receiveFile;
private long receiveFilePos; // keep track of receive offset
@@ -188,6 +189,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
actionFile = createLockFile(lockFile.getName() + "-action.json");
outputFile = createLockFile(lockFile.getName() + "-output.json");
traceFile = createLockFile(lockFile.getName() + "-trace.json");
+ messageHistoryFile = createLockFile(lockFile.getName() +
"-history.json");
debugFile = createLockFile(lockFile.getName() + "-debug.json");
receiveFile = createLockFile(lockFile.getName() + "-receive.json");
executor.scheduleWithFixedDelay(this::task, 0, delay,
TimeUnit.MILLISECONDS);
@@ -1285,6 +1287,21 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
LOG.trace("Error updating debug file: {} due to: {}. This
exception is ignored.",
debugFile, e.getMessage(), e);
}
+ try {
+ DevConsole dc13b =
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+ .resolveById("message-history");
+ if (dc13b != null) {
+ JsonObject json = (JsonObject)
dc13b.call(DevConsole.MediaType.JSON);
+ // store replays in a special file
+ LOG.trace("Updating message-history file: {}",
messageHistoryFile);
+ String data = json.toJson() + System.lineSeparator();
+ IOHelper.writeText(data, messageHistoryFile);
+ }
+ } catch (Exception e) {
+ // ignore
+ LOG.trace("Error updating message-history file: {} due to: {}.
This exception is ignored.",
+ messageHistoryFile, e.getMessage(), e);
+ }
try {
DevConsole dc14 =
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
.resolveById("receive");
@@ -1444,6 +1461,9 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
if (traceFile != null) {
FileUtil.deleteFile(traceFile);
}
+ if (messageHistoryFile != null) {
+ FileUtil.deleteFile(messageHistoryFile);
+ }
if (debugFile != null) {
FileUtil.deleteFile(debugFile);
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
index 03b2aa69ad60..583658b41383 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
@@ -117,6 +117,10 @@ public abstract class CamelCommand implements
Callable<Integer> {
return CommandLineHelper.getCamelDir().resolve(pid + "-debug.json");
}
+ public Path getMessageHistoryFile(String pid) {
+ return CommandLineHelper.getCamelDir().resolve(pid + "-history.json");
+ }
+
public Path getRunBackgroundLogFile(String uuid) {
return CommandLineHelper.getCamelDir().resolve(uuid + "-run.log");
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index ed5147326db2..addbabb952dc 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -108,6 +108,7 @@ public class CamelJBangMain implements Callable<Integer> {
.addSubcommand("producer", new CommandLine(new
ListProducer(main)))
.addSubcommand("endpoint", new CommandLine(new
ListEndpoint(main)))
.addSubcommand("event", new CommandLine(new
ListEvent(main)))
+ .addSubcommand("history", new CommandLine(new
CamelHistoryAction(main)))
.addSubcommand("inflight", new CommandLine(new
ListInflight(main)))
.addSubcommand("blocked", new CommandLine(new
ListBlocked(main)))
.addSubcommand("internal-task", new CommandLine(new
ListInternalTask(main)))
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
new file mode 100644
index 000000000000..aabede65d651
--- /dev/null
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.action;
+
+import java.io.LineNumberReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import org.fusesource.jansi.Ansi;
+import picocli.CommandLine;
+
[email protected](name = "history",
+ description = "History of latest completed exchange",
sortOptions = false, showDefaultValues = true)
+public class CamelHistoryAction extends ActionWatchCommand {
+
+ @CommandLine.Parameters(description = "Name or pid of running Camel
integration", arity = "0..1")
+ String name = "*";
+
+ @CommandLine.Option(names = { "--source" },
+ description = "Prefer to display source filename/code
instead of IDs")
+ boolean source;
+
+ @CommandLine.Option(names = { "--mask" },
+ description = "Whether to mask endpoint URIs to avoid
printing sensitive information such as password or access keys")
+ boolean mask;
+
+ public CamelHistoryAction(CamelJBangMain main) {
+ super(main);
+ }
+
+ @Override
+ public Integer doWatchCall() throws Exception {
+ if (name == null) {
+ name = "*";
+ }
+
+ List<List<Row>> pids = loadRows();
+
+ if (!pids.isEmpty()) {
+ if (watch) {
+ clearScreen();
+ }
+ for (List<Row> rows : pids) {
+ Row first = rows.get(0);
+ String ago = TimeUtils.printSince(first.timestamp);
+ Row last = rows.get(rows.size() - 1);
+ String status = last.failed ? "failed" : "success";
+ String s = String.format("Message History of last completed
(id:%s status:%s ago:%s pid:%d name:%s)",
+ first.exchangeId, status, ago, first.pid, first.name);
+ printer().println(s);
+
+ printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS,
rows, Arrays.asList(
+ new Column().header("").dataAlign(HorizontalAlign.LEFT)
+ .minWidth(6).maxWidth(6)
+ .with(this::getDirection),
+ new
Column().header("ID").dataAlign(HorizontalAlign.LEFT)
+ .minWidth(10).maxWidth(20,
OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(this::getId),
+ new
Column().header("PROCESSOR").dataAlign(HorizontalAlign.LEFT)
+ .minWidth(40).maxWidth(55,
OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(this::getProcessor),
+ new
Column().header("ELAPSED").dataAlign(HorizontalAlign.RIGHT)
+ .maxWidth(10, OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(r -> "" + r.elapsed),
+ new Column().header("").dataAlign(HorizontalAlign.LEFT)
+ .maxWidth(60, OverflowBehaviour.NEWLINE)
+ .with(this::getMessage))));
+
+ JsonObject cause = last.exception;
+ if (cause != null) {
+ String st = cause.getString("stackTrace");
+ if (st != null) {
+ st = Jsoner.unescape(st);
+ }
+ if (st != null) {
+ printer().println();
+ printer().println("Stacktrace");
+ printer().println("-".repeat(80));
+ String text =
Ansi.ansi().fgRed().a(st).reset().toString();
+ printer().println(text);
+ printer().println();
+ }
+ }
+ printer().println();
+ }
+ }
+
+ return 0;
+ }
+
+ private String getId(Row r) {
+ String answer;
+ if (source && r.location != null) {
+ answer = r.location;
+ } else {
+ if (r.nodeId == null) {
+ answer = r.routeId;
+ } else {
+ answer = r.nodeId;
+ }
+ }
+ return answer;
+ }
+
+ private String getProcessor(Row r) {
+ if (r.first || r.last) {
+ return "from[" + r.endpoint.getString("endpoint") + "]";
+ } else if (r.nodeLabel != null) {
+ return r.nodeLabel;
+ } else {
+ return r.nodeId;
+ }
+ }
+
+ private String getDirection(Row r) {
+ if (r.first) {
+ return "*-->";
+ } else if (r.last) {
+ return "*<--";
+ } else {
+ return "";
+ }
+ }
+
+ private String getMessage(Row r) {
+ if (r.failed && !r.last) {
+ return "Exception: " + r.exception.getString("message");
+ }
+ if (r.last) {
+ return r.failed ? "failed" : "success";
+ }
+ return null;
+ }
+
+ protected List<List<Row>> loadRows() throws Exception {
+ List<List<Row>> answer = new ArrayList<>();
+ List<Long> pids = findPids(name);
+ for (long pid : pids) {
+ Path p = getMessageHistoryFile(Long.toString(pid));
+ if (Files.exists(p)) {
+ String line;
+ LineNumberReader reader = new
LineNumberReader(Files.newBufferedReader(p));
+ try {
+ List<Row> rows = new ArrayList<>();
+ do {
+ line = reader.readLine();
+ List<Row> load = parseTraceLine(pid, line);
+ if (load != null) {
+ rows.addAll(load);
+ }
+ } while (line != null);
+ if (!rows.isEmpty()) {
+ answer.add(rows);
+ }
+ } catch (Exception e) {
+ // ignore
+ } finally {
+ IOHelper.close(reader);
+ }
+ }
+ }
+ return answer;
+ }
+
+ private List<Row> parseTraceLine(long pid, String line) {
+ JsonObject root = null;
+ try {
+ root = (JsonObject) Jsoner.deserialize(line);
+ } catch (Exception e) {
+ // ignore
+ }
+ if (root != null) {
+ List<Row> rows = new ArrayList<>();
+ String name = root.getString("name");
+ JsonArray arr = root.getCollection("traces");
+ if (arr != null) {
+ for (Object o : arr) {
+ Row row = new Row();
+ row.pid = pid;
+ row.name = name;
+ JsonObject jo = (JsonObject) o;
+ row.uid = jo.getLong("uid");
+ row.first = jo.getBoolean("first");
+ row.last = jo.getBoolean("last");
+ row.location = jo.getString("location");
+ row.routeId = jo.getString("routeId");
+ row.nodeId = jo.getString("nodeId");
+ row.nodeShortName = jo.getString("nodeShortName");
+ row.nodeLabel = jo.getString("nodeLabel");
+ if (mask) {
+ row.nodeLabel = URISupport.sanitizeUri(row.nodeLabel);
+ }
+ String uri = jo.getString("endpointUri");
+ if (uri != null) {
+ row.endpoint = new JsonObject();
+ if (mask) {
+ uri = URISupport.sanitizeUri(uri);
+ }
+ row.endpoint.put("endpoint", uri);
+ row.endpoint.put("remote",
jo.getBooleanOrDefault("remoteEndpoint", true));
+ }
+ JsonObject es = jo.getMap("endpointService");
+ if (es != null) {
+ row.endpointService = es;
+ }
+ Long ts = jo.getLong("timestamp");
+ if (ts != null) {
+ row.timestamp = ts;
+ }
+ row.elapsed = jo.getLong("elapsed");
+ row.failed = jo.getBoolean("failed");
+ row.done = jo.getBoolean("done");
+ row.threadName = jo.getString("threadName");
+ row.message = jo.getMap("message");
+ row.exception = jo.getMap("exception");
+ row.exchangeId = row.message.getString("exchangeId");
+ row.exchangePattern =
row.message.getString("exchangePattern");
+ // we should exchangeId/pattern elsewhere
+ row.message.remove("exchangeId");
+ row.message.remove("exchangePattern");
+ rows.add(row);
+ }
+ }
+ return rows;
+ }
+ return null;
+ }
+
+ private static class Row {
+ long pid;
+ String name;
+ boolean first;
+ boolean last;
+ long uid;
+ String exchangeId;
+ String exchangePattern;
+ String threadName;
+ String location;
+ String routeId;
+ String nodeId;
+ String nodeShortName;
+ String nodeLabel;
+ long timestamp;
+ long elapsed;
+ boolean done;
+ boolean failed;
+ JsonObject endpoint;
+ JsonObject endpointService;
+ JsonObject message;
+ JsonObject exception;
+ }
+
+}