This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch sm in repository https://gitbox.apache.org/repos/asf/camel.git
commit c694b3db8baa8a5cdb99d5fef9d9ba2adea68182 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Nov 17 12:54:26 2025 +0100 CAMEL-22687: camel-jbang - get history to show latest completed exchange message history. --- .../apache/camel/catalog/dev-consoles.properties | 1 + .../catalog/dev-consoles/message-history.json | 15 ++ .../java/org/apache/camel/spi/BacklogTracer.java | 26 ++ .../camel/spi/BacklogTracerEventMessage.java | 10 + .../apache/camel/impl/debugger/BacklogTracer.java | 80 +++++- .../impl/debugger/DefaultBacklogDebugger.java | 17 +- .../debugger/DefaultBacklogTracerEventMessage.java | 25 +- .../camel/impl/engine/CamelInternalProcessor.java | 14 +- .../MessageHistoryDevConsoleConfigurer.java | 63 +++++ .../apache/camel/dev-console/message-history.json | 15 ++ ...che.camel.impl.console.MessageHistoryDevConsole | 2 + .../org/apache/camel/dev-console/message-history | 2 + .../org/apache/camel/dev-consoles.properties | 2 +- .../impl/console/MessageHistoryDevConsole.java | 72 ++++++ .../mbean/ManagedBacklogTracerMBean.java | 9 + .../management/mbean/ManagedBacklogTracer.java | 15 ++ .../BacklogTracerMessageHistoryTest.java | 138 ++++++++++ .../camel/cli/connector/LocalCliConnector.java | 20 ++ .../dsl/jbang/core/commands/CamelCommand.java | 4 + .../dsl/jbang/core/commands/CamelJBangMain.java | 1 + .../core/commands/action/CamelHistoryAction.java | 281 +++++++++++++++++++++ 21 files changed, 794 insertions(+), 18 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties index 9b787ecb27df..482964cf14ff 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties @@ -28,6 +28,7 @@ log main-configuration main-http-server memory +message-history micrometer platform-http processor diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json new file mode 100644 index 000000000000..f3c6d76d2c3b --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/message-history.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "message-history", + "title": "Message History", + "description": "History of latest completed exchange", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.MessageHistoryDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.17.0-SNAPSHOT" + } +} + diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java index 633c98fbb25b..95f46b0679b4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.spi; +import java.util.Collection; import java.util.List; /** @@ -184,11 +185,26 @@ public interface BacklogTracer { */ void resetTraceCounter(); + /** + * Get all tracing data (without removing) + */ + Collection<BacklogTracerEventMessage> getAllTracedMessages(); + + /** + * Get latest completed exchange message history (without removing) + */ + Collection<BacklogTracerEventMessage> getLatestMessageHistory(); + /** * Dumps all tracing data */ List<BacklogTracerEventMessage> dumpAllTracedMessages(); + /** + * Dumps latest completed exchange message history + */ + List<BacklogTracerEventMessage> dumpLatestMessageHistory(); + /** * Dumps tracing data for the given route id / node id */ @@ -204,6 +220,11 @@ public interface BacklogTracer { */ String dumpTracedMessagesAsXml(String nodeId); + /** + * Dumps latest completed exchange message history as XML + */ + String dumpLatestMessageHistoryAsXml(); + /** * Dumps all tracing data as JSon */ @@ -214,6 +235,11 @@ public interface BacklogTracer { */ String dumpTracedMessagesAsJSon(String nodeId); + /** + * Dumps latest completed exchange message history as JSon + */ + String dumpLatestMessageHistoryAsJSon(); + /** * Clears the backlog of traced messages. */ diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java index 7248f736aa00..6a632d43efb0 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java @@ -73,6 +73,16 @@ public interface BacklogTracerEventMessage { */ String getToNode(); + /** + * Node short name where the message is being routed to + */ + String getToNodeShortName(); + + /** + * Node label where the message is being routed to + */ + String getToNodeLabel(); + /** * The exchange id */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java index 011df4b71cf7..04815bfe3f4d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java @@ -17,6 +17,8 @@ package org.apache.camel.impl.debugger; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Queue; @@ -43,7 +45,7 @@ import org.apache.camel.util.json.Jsoner; * This tracer allows to store message tracers per node in the Camel routes. The tracers is stored in a backlog queue * (FIFO based) which allows to pull the traced messages on demand. */ -public final class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.BacklogTracer { +public class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.BacklogTracer { // limit the tracer to a thousand messages in total public static final int MAX_BACKLOG_SIZE = 1000; @@ -56,6 +58,9 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca private final Queue<BacklogTracerEventMessage> queue = new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE); // how many of the last messages to keep in the backlog at total private int backlogSize = 100; + // use tracer to capture additional information for capturing latest completed exchange message-history + private final Queue<BacklogTracerEventMessage> provisionalHistoryQueue = new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE); + private final Queue<BacklogTracerEventMessage> completeHistoryQueue = new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE); private boolean removeOnDump = true; private int bodyMaxChars = 32 * 1024; private boolean bodyIncludeStreams; @@ -71,7 +76,7 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca private String traceFilter; private Predicate predicate; - private BacklogTracer(CamelContext camelContext) { + BacklogTracer(CamelContext camelContext) { this.camelContext = camelContext; this.simple = camelContext.resolveLanguage("simple"); } @@ -94,7 +99,10 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca * @return <tt>true</tt> to trace, <tt>false</tt> to skip tracing */ public boolean shouldTrace(NamedNode definition, Exchange exchange) { - if (!enabled) { + // special in standby mode we allow using tracer to capture latest tracing data for + // enriched message history + boolean history = (enabled || standby) && camelContext.isMessageHistory(); + if (!history && !enabled) { return false; } @@ -131,6 +139,29 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca } public void traceEvent(DefaultBacklogTracerEventMessage event) { + // special in standby mode we allow using tracer to capture latest tracing data for + // enriched message history + boolean history = (enabled || standby) && camelContext.isMessageHistory(); + if (!history && !enabled) { + return; + } + + // handle capturing events for last full completed exchange (aka replay) + if (camelContext.isMessageHistory()) { + String tid = null; + var head = provisionalHistoryQueue.peek(); + if (head != null) { + tid = head.getExchangeId(); + } + if (tid == null || tid.equals(event.getExchangeId())) { + provisionalHistoryQueue.add(event); + if (event.isLast()) { + completeHistoryQueue.clear(); + completeHistoryQueue.addAll(provisionalHistoryQueue); + provisionalHistoryQueue.clear(); + } + } + } if (!enabled) { return; } @@ -340,6 +371,15 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca traceCounter.set(0); } + @Override + public Collection<BacklogTracerEventMessage> getAllTracedMessages() { + return Collections.unmodifiableCollection(queue); + } + + public Collection<BacklogTracerEventMessage> getLatestMessageHistory() { + return Collections.unmodifiableCollection(completeHistoryQueue); + } + public List<BacklogTracerEventMessage> dumpTracedMessages(String nodeId) { List<BacklogTracerEventMessage> answer = new ArrayList<>(); if (nodeId != null) { @@ -350,7 +390,7 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca } } - if (removeOnDump) { + if (isRemoveOnDump()) { queue.removeAll(answer); } @@ -360,7 +400,12 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca @Override public String dumpTracedMessagesAsXml(String nodeId) { List<BacklogTracerEventMessage> events = dumpTracedMessages(nodeId); + return wrapAroundRootTag(events); + } + @Override + public String dumpLatestMessageHistoryAsXml() { + List<BacklogTracerEventMessage> events = dumpLatestMessageHistory(); return wrapAroundRootTag(events); } @@ -377,6 +422,19 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca return Jsoner.prettyPrint(root.toJson()); } + @Override + public String dumpLatestMessageHistoryAsJSon() { + List<BacklogTracerEventMessage> events = dumpLatestMessageHistory(); + + JsonObject root = new JsonObject(); + JsonArray arr = new JsonArray(); + root.put("traces", arr); + for (BacklogTracerEventMessage event : events) { + arr.add(event.asJSon()); + } + return Jsoner.prettyPrint(root.toJson()); + } + @Override public List<BacklogTracerEventMessage> dumpAllTracedMessages() { List<BacklogTracerEventMessage> answer = new ArrayList<>(queue); @@ -386,10 +444,18 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca return answer; } + @Override + public List<BacklogTracerEventMessage> dumpLatestMessageHistory() { + List<BacklogTracerEventMessage> answer = new ArrayList<>(completeHistoryQueue); + if (isRemoveOnDump()) { + completeHistoryQueue.clear(); + } + return answer; + } + @Override public String dumpAllTracedMessagesAsXml() { List<BacklogTracerEventMessage> events = dumpAllTracedMessages(); - return wrapAroundRootTag(events); } @@ -419,6 +485,8 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca @Override public void clear() { queue.clear(); + completeHistoryQueue.clear(); + provisionalHistoryQueue.clear(); } public long incrementTraceCounter() { @@ -427,7 +495,7 @@ public final class BacklogTracer extends ServiceSupport implements org.apache.ca @Override protected void doStop() throws Exception { - queue.clear(); + clear(); } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java index e3ec7c9109c3..090040ed9137 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java @@ -58,6 +58,7 @@ import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; +import org.apache.camel.util.StringHelper; import org.apache.camel.util.json.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -884,7 +885,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back (nId, message) -> new DefaultBacklogTracerEventMessage( camelContext, false, false, message.getUid(), message.getTimestamp(), message.getLocation(), message.getRouteId(), - message.getToNode(), + message.getToNode(), message.getToNodeShortName(), message.getToNodeLabel(), message.getExchangeId(), false, false, dumpAsJSonObject(suspendedExchange.getExchange()))); @@ -921,6 +922,9 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back // store a copy of the message so we can see that from the debugger long timestamp = System.currentTimeMillis(); String toNode = definition.getId(); + String toNodeShortName = definition.getShortName(); + // avoid label is too large + String toNodeLabel = StringHelper.limitLength(definition.getLabel(), 50); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); long uid = debugCounter.incrementAndGet(); @@ -930,7 +934,8 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - first, false, uid, timestamp, source, routeId, toNode, exchangeId, false, false, data); + first, false, uid, timestamp, source, routeId, toNode, toNodeShortName, toNodeLabel, exchangeId, + false, false, data); suspendedBreakpointMessages.put(nodeId, msg); // suspend at this breakpoint @@ -1003,6 +1008,9 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back // store a copy of the message so we can see that from the debugger long timestamp = System.currentTimeMillis(); String toNode = definition.getId(); + String toNodeShortName = definition.getShortName(); + // avoid label is too large + String toNodeLabel = StringHelper.limitLength(definition.getLabel(), 50); String routeId = CamelContextHelper.getRouteId(definition); String exchangeId = exchange.getExchangeId(); long uid = debugCounter.incrementAndGet(); @@ -1011,7 +1019,8 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - false, false, uid, timestamp, source, routeId, toNode, exchangeId, false, false, data); + false, false, uid, timestamp, source, routeId, toNode, toNodeShortName, toNodeLabel, exchangeId, + false, false, data); suspendedBreakpointMessages.put(toNode, msg); // suspend at this breakpoint @@ -1114,7 +1123,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( camelContext, - false, true, uid, timestamp, source, routeId, toNode, exchangeId, false, false, data); + false, true, uid, timestamp, source, routeId, toNode, null, null, exchangeId, false, false, data); // we want to capture if there was an exception if (cause != null) { msg.setException(cause); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java index 0288b68f1a7c..01caaeedb11d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java @@ -45,6 +45,8 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private final String location; private final String routeId; private final String toNode; + private final String toNodeShortName; + private final String toNodeLabel; private final String exchangeId; private final String threadName; private String endpointUri; @@ -65,8 +67,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private boolean done; public DefaultBacklogTracerEventMessage(CamelContext camelContext, boolean first, boolean last, long uid, long timestamp, - String location, String routeId, String toNode, String exchangeId, - boolean rest, boolean template, JsonObject data) { + String location, String routeId, String toNode, String toNodeShortName, + String toNodeLabel, + String exchangeId, boolean rest, boolean template, JsonObject data) { this.camelContext = camelContext; this.watch = new StopWatch(); this.first = first; @@ -76,6 +79,8 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven this.location = location; this.routeId = routeId; this.toNode = toNode; + this.toNodeShortName = toNodeShortName; + this.toNodeLabel = toNodeLabel; this.exchangeId = exchangeId; this.rest = rest; this.template = template; @@ -136,6 +141,16 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven return toNode; } + @Override + public String getToNodeShortName() { + return toNodeShortName; + } + + @Override + public String getToNodeLabel() { + return toNodeLabel; + } + @Override public String getExchangeId() { return exchangeId; @@ -503,6 +518,12 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven if (toNode != null) { jo.put("nodeId", toNode); } + if (toNodeShortName != null) { + jo.put("nodeShortName", toNodeShortName); + } + if (toNodeLabel != null) { + jo.put("nodeLabel", toNodeLabel); + } if (exchangeId != null) { jo.put("exchangeId", exchangeId); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index a05df7e59b68..0dede3489adc 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -77,6 +77,7 @@ import org.apache.camel.support.UnitOfWorkHelper; import org.apache.camel.support.processor.DelegateAsyncProcessor; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.StopWatch; +import org.apache.camel.util.StringHelper; import org.apache.camel.util.json.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -650,6 +651,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In long timestamp = System.currentTimeMillis(); String toNode = processorDefinition.getId(); + String toNodeShortName = processorDefinition.getShortName(); + String toNodeLabel = StringHelper.limitLength(processorDefinition.getLabel(), 50); String exchangeId = exchange.getExchangeId(); boolean includeExchangeProperties = backlogTracer.isIncludeExchangeProperties(); boolean includeExchangeVariables = backlogTracer.isIncludeExchangeVariables(); @@ -666,8 +669,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In final long created = exchange.getClock().getCreated(); DefaultBacklogTracerEventMessage pseudoFirst = new DefaultBacklogTracerEventMessage( camelContext, - true, false, backlogTracer.incrementTraceCounter(), created, source, routeId, null, exchangeId, - rest, template, data); + true, false, backlogTracer.incrementTraceCounter(), created, source, routeId, null, null, null, + exchangeId, rest, template, data); if (exchange.getFromEndpoint() instanceof EndpointServiceLocation esl) { pseudoFirst.setEndpointServiceUrl(esl.getServiceUrl()); pseudoFirst.setEndpointServiceProtocol(esl.getServiceProtocol()); @@ -679,8 +682,9 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( camelContext, - false, false, backlogTracer.incrementTraceCounter(), timestamp, source, routeId, toNode, exchangeId, - rest, template, data); + false, false, backlogTracer.incrementTraceCounter(), timestamp, source, routeId, toNode, + toNodeShortName, toNodeLabel, + exchangeId, rest, template, data); backlogTracer.traceEvent(event); return event; @@ -705,7 +709,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In backlogTracer.getBodyMaxChars()); DefaultBacklogTracerEventMessage pseudoLast = new DefaultBacklogTracerEventMessage( camelContext, - false, true, backlogTracer.incrementTraceCounter(), created, source, routeId, null, + false, true, backlogTracer.incrementTraceCounter(), created, source, routeId, null, null, null, exchangeId, rest, template, data); backlogTracer.traceEvent(pseudoLast); doneProcessing(exchange, pseudoLast); diff --git a/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java b/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java new file mode 100644 index 000000000000..ba2897aeebd0 --- /dev/null +++ b/core/camel-console/src/generated/java/org/apache/camel/impl/console/MessageHistoryDevConsoleConfigurer.java @@ -0,0 +1,63 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.impl.console; + +import javax.annotation.processing.Generated; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.spi.ConfigurerStrategy; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.impl.console.MessageHistoryDevConsole; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo") +@SuppressWarnings("unchecked") +public class MessageHistoryDevConsoleConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter { + + private static final Map<String, Object> ALL_OPTIONS; + static { + Map<String, Object> map = new CaseInsensitiveMap(); + map.put("CamelContext", org.apache.camel.CamelContext.class); + ALL_OPTIONS = map; + } + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.impl.console.MessageHistoryDevConsole target = (org.apache.camel.impl.console.MessageHistoryDevConsole) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": target.setCamelContext(property(camelContext, org.apache.camel.CamelContext.class, value)); return true; + default: return false; + } + } + + @Override + public Map<String, Object> getAllOptions(Object target) { + return ALL_OPTIONS; + } + + @Override + public Class<?> getOptionType(String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": return org.apache.camel.CamelContext.class; + default: return null; + } + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.impl.console.MessageHistoryDevConsole target = (org.apache.camel.impl.console.MessageHistoryDevConsole) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": return target.getCamelContext(); + default: return null; + } + } +} + diff --git a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json new file mode 100644 index 000000000000..f3c6d76d2c3b --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/message-history.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "message-history", + "title": "Message History", + "description": "History of latest completed exchange", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.MessageHistoryDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.17.0-SNAPSHOT" + } +} + diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole new file mode 100644 index 000000000000..14a46fe27c2e --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.MessageHistoryDevConsole @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.MessageHistoryDevConsoleConfigurer diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history new file mode 100644 index 000000000000..9aa3bbe57b77 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/message-history @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.MessageHistoryDevConsole diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties index 9c04f6ddf671..4465a6069c9d 100644 --- a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties @@ -1,5 +1,5 @@ # Generated by camel build tools - do NOT edit this file! -dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight internal-tasks java-security jvm log memory processor producer properties receive reload rest route route-controller route-dump route-group route-structure send service source startup-recorder system-properties thread top trace transformers type-converters variables +dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight internal-tasks java-security jvm log memory message-history processor producer properties receive reload rest route route-controller route-dump route-group route-structure send service source startup-recorder system-properties thread top trace transformers type-converters variables groupId=org.apache.camel artifactId=camel-console version=4.17.0-SNAPSHOT diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java new file mode 100644 index 000000000000..f2d0c6656190 --- /dev/null +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/MessageHistoryDevConsole.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.console; + +import java.util.Collection; +import java.util.Map; + +import org.apache.camel.spi.BacklogTracer; +import org.apache.camel.spi.BacklogTracerEventMessage; +import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; + +@DevConsole(name = "message-history", displayName = "Message History", description = "History of latest completed exchange") +@Configurer(extended = true) +public class MessageHistoryDevConsole extends AbstractDevConsole { + + public MessageHistoryDevConsole() { + super("camel", "message-history", "Message History", "History of latest completed exchange"); + } + + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + + BacklogTracer tracer = getCamelContext().getCamelContextExtension().getContextPlugin(BacklogTracer.class); + if (tracer != null) { + Collection<BacklogTracerEventMessage> queue = tracer.getLatestMessageHistory(); + for (BacklogTracerEventMessage t : queue) { + String json = t.toJSon(0); + sb.append(json).append("\n"); + } + } + + return sb.toString(); + } + + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + BacklogTracer tracer = getCamelContext().getCamelContextExtension().getContextPlugin(BacklogTracer.class); + if (tracer != null) { + JsonArray arr = new JsonArray(); + + Collection<BacklogTracerEventMessage> queue = tracer.getLatestMessageHistory(); + for (BacklogTracerEventMessage t : queue) { + JsonObject jo = (JsonObject) t.asJSon(); + arr.add(jo); + } + root.put("name", getCamelContext().getName()); + root.put("traces", arr); + } + + return root; + } + +} diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java index 3ea2086f0ff1..d254c9a27c6f 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogTracerMBean.java @@ -120,12 +120,21 @@ public interface ManagedBacklogTracerMBean { @ManagedOperation(description = "Dumps all the traced messages") List<BacklogTracerEventMessage> dumpAllTracedMessages(); + @ManagedOperation(description = "Dumps latest completed exchange message history") + List<BacklogTracerEventMessage> dumpLatestMessageHistory(); + @ManagedOperation(description = "Dumps all the traced messages in XML format") String dumpAllTracedMessagesAsXml(); @ManagedOperation(description = "Dumps all the traced messages in JSon format") String dumpAllTracedMessagesAsJSon(); + @ManagedOperation(description = "Dumps latest completed exchange message history in XML format") + String dumpLatestMessageHistoryAsXml(); + + @ManagedOperation(description = "Dumps latest completed exchange message history in JSon format") + String dumpLatestMessageHistoryAsJSon(); + @ManagedOperation(description = "Clears the backlog") void clear(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java index 5c31cc3c0cbc..098d954ec52c 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogTracer.java @@ -201,6 +201,11 @@ public class ManagedBacklogTracer implements ManagedBacklogTracerMBean { return backlogTracer.dumpAllTracedMessages(); } + @Override + public List<BacklogTracerEventMessage> dumpLatestMessageHistory() { + return backlogTracer.dumpLatestMessageHistory(); + } + @Override public String dumpTracedMessagesAsXml(String nodeOrRouteId) { return backlogTracer.dumpTracedMessagesAsXml(nodeOrRouteId); @@ -221,6 +226,16 @@ public class ManagedBacklogTracer implements ManagedBacklogTracerMBean { return backlogTracer.dumpAllTracedMessagesAsJSon(); } + @Override + public String dumpLatestMessageHistoryAsXml() { + return backlogTracer.dumpLatestMessageHistoryAsXml(); + } + + @Override + public String dumpLatestMessageHistoryAsJSon() { + return backlogTracer.dumpLatestMessageHistoryAsJSon(); + } + @Override public void clear() { backlogTracer.clear(); diff --git a/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java new file mode 100644 index 000000000000..ea80f0620e59 --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/BacklogTracerMessageHistoryTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.List; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.BacklogTracerEventMessage; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledOnOs(OS.AIX) +public class BacklogTracerMessageHistoryTest extends ManagementTestSupport { + + @SuppressWarnings("unchecked") + @Test + public void testBacklogTracerReplay() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on + = new ObjectName("org.apache.camel:context=" + context.getManagementName() + ",type=tracer,name=BacklogTracer"); + assertNotNull(on); + assertTrue(mbeanServer.isRegistered(on)); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals(Boolean.TRUE, enabled, "Should be enabled"); + + Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize"); + assertEquals(100, size.intValue(), "Should be 100"); + + Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on, "RemoveOnDump"); + assertEquals(Boolean.TRUE, removeOnDump); + + getMockEndpoint("mock:foo").expectedMessageCount(2); + getMockEndpoint("mock:bar").expectedMessageCount(2); + + template.sendBody("direct:start", "Hello World"); + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + + List<Exchange> exchanges = getMockEndpoint("mock:foo").getReceivedExchanges(); + + List<BacklogTracerEventMessage> events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages", + new Object[] { "foo" }, new String[] { "java.lang.String" }); + + assertNotNull(events); + assertEquals(2, events.size()); + + BacklogTracerEventMessage event1 = events.get(0); + assertEquals("foo", event1.getToNode()); + assertEquals(" <message exchangeId=\"" + exchanges.get(0).getExchangeId() + + "\" exchangePattern=\"InOnly\" exchangeType=\"org.apache.camel.support.DefaultExchange\" messageType=\"org.apache.camel.support.DefaultMessage\">\n" + + " <exchangeProperties>\n" + + " <exchangeProperty key=\"CamelToEndpoint\" type=\"java.lang.String\">direct://start</exchangeProperty>\n" + + " </exchangeProperties>\n" + + " <body type=\"java.lang.String\">Hello World</body>\n" + + " </message>", + event1.getMessageAsXml()); + + BacklogTracerEventMessage event2 = events.get(1); + assertEquals("foo", event2.getToNode()); + assertEquals(" <message exchangeId=\"" + exchanges.get(1).getExchangeId() + + "\" exchangePattern=\"InOnly\" exchangeType=\"org.apache.camel.support.DefaultExchange\" messageType=\"org.apache.camel.support.DefaultMessage\">\n" + + " <exchangeProperties>\n" + + " <exchangeProperty key=\"CamelToEndpoint\" type=\"java.lang.String\">direct://start</exchangeProperty>\n" + + " </exchangeProperties>\n" + + " <body type=\"java.lang.String\">Bye World</body>\n" + + " </message>", + event2.getMessageAsXml()); + + events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpLatestMessageHistory", null, null); + assertNotNull(events); + assertEquals(4, events.size()); + + assertTrue(events.get(0).isFirst()); + assertEquals("direct://start", events.get(0).getEndpointUri()); + assertNull(events.get(0).getToNode()); + + assertFalse(events.get(1).isFirst()); + assertFalse(events.get(1).isLast()); + assertEquals("foo", events.get(1).getToNode()); + assertEquals("to", events.get(1).getToNodeShortName()); + assertEquals("to[mock:foo]", events.get(1).getToNodeLabel()); + + assertFalse(events.get(2).isFirst()); + assertFalse(events.get(2).isLast()); + assertEquals("bar", events.get(2).getToNode()); + assertEquals("to", events.get(2).getToNodeShortName()); + assertEquals("to[mock:bar]", events.get(2).getToNodeLabel()); + + assertTrue(events.get(3).isLast()); + assertEquals("direct://start", events.get(3).getEndpointUri()); + assertNull(events.get(3).getToNode()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + context.setUseBreadcrumb(false); + context.setBacklogTracing(true); + context.setMessageHistory(true); + + from("direct:start") + .to("mock:foo").id("foo") + .to("mock:bar").id("bar"); + + } + }; + } + +} diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index d4ff037ac3bb..0e43279af66d 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -117,6 +117,7 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C private File outputFile; private File traceFile; private long traceFilePos; // keep track of trace offset + private File messageHistoryFile; private File debugFile; private File receiveFile; private long receiveFilePos; // keep track of receive offset @@ -188,6 +189,7 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C actionFile = createLockFile(lockFile.getName() + "-action.json"); outputFile = createLockFile(lockFile.getName() + "-output.json"); traceFile = createLockFile(lockFile.getName() + "-trace.json"); + messageHistoryFile = createLockFile(lockFile.getName() + "-history.json"); debugFile = createLockFile(lockFile.getName() + "-debug.json"); receiveFile = createLockFile(lockFile.getName() + "-receive.json"); executor.scheduleWithFixedDelay(this::task, 0, delay, TimeUnit.MILLISECONDS); @@ -1285,6 +1287,21 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C LOG.trace("Error updating debug file: {} due to: {}. This exception is ignored.", debugFile, e.getMessage(), e); } + try { + DevConsole dc13b = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) + .resolveById("message-history"); + if (dc13b != null) { + JsonObject json = (JsonObject) dc13b.call(DevConsole.MediaType.JSON); + // store replays in a special file + LOG.trace("Updating message-history file: {}", messageHistoryFile); + String data = json.toJson() + System.lineSeparator(); + IOHelper.writeText(data, messageHistoryFile); + } + } catch (Exception e) { + // ignore + LOG.trace("Error updating message-history file: {} due to: {}. This exception is ignored.", + messageHistoryFile, e.getMessage(), e); + } try { DevConsole dc14 = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) .resolveById("receive"); @@ -1444,6 +1461,9 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C if (traceFile != null) { FileUtil.deleteFile(traceFile); } + if (messageHistoryFile != null) { + FileUtil.deleteFile(messageHistoryFile); + } if (debugFile != null) { FileUtil.deleteFile(debugFile); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java index 03b2aa69ad60..583658b41383 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java @@ -117,6 +117,10 @@ public abstract class CamelCommand implements Callable<Integer> { return CommandLineHelper.getCamelDir().resolve(pid + "-debug.json"); } + public Path getMessageHistoryFile(String pid) { + return CommandLineHelper.getCamelDir().resolve(pid + "-history.json"); + } + public Path getRunBackgroundLogFile(String uuid) { return CommandLineHelper.getCamelDir().resolve(uuid + "-run.log"); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index ed5147326db2..addbabb952dc 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -108,6 +108,7 @@ public class CamelJBangMain implements Callable<Integer> { .addSubcommand("producer", new CommandLine(new ListProducer(main))) .addSubcommand("endpoint", new CommandLine(new ListEndpoint(main))) .addSubcommand("event", new CommandLine(new ListEvent(main))) + .addSubcommand("history", new CommandLine(new CamelHistoryAction(main))) .addSubcommand("inflight", new CommandLine(new ListInflight(main))) .addSubcommand("blocked", new CommandLine(new ListBlocked(main))) .addSubcommand("internal-task", new CommandLine(new ListInternalTask(main))) diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java new file mode 100644 index 000000000000..aabede65d651 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelHistoryAction.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.action; + +import java.io.LineNumberReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.github.freva.asciitable.AsciiTable; +import com.github.freva.asciitable.Column; +import com.github.freva.asciitable.HorizontalAlign; +import com.github.freva.asciitable.OverflowBehaviour; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.URISupport; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; +import org.apache.camel.util.json.Jsoner; +import org.fusesource.jansi.Ansi; +import picocli.CommandLine; + [email protected](name = "history", + description = "History of latest completed exchange", sortOptions = false, showDefaultValues = true) +public class CamelHistoryAction extends ActionWatchCommand { + + @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") + String name = "*"; + + @CommandLine.Option(names = { "--source" }, + description = "Prefer to display source filename/code instead of IDs") + boolean source; + + @CommandLine.Option(names = { "--mask" }, + description = "Whether to mask endpoint URIs to avoid printing sensitive information such as password or access keys") + boolean mask; + + public CamelHistoryAction(CamelJBangMain main) { + super(main); + } + + @Override + public Integer doWatchCall() throws Exception { + if (name == null) { + name = "*"; + } + + List<List<Row>> pids = loadRows(); + + if (!pids.isEmpty()) { + if (watch) { + clearScreen(); + } + for (List<Row> rows : pids) { + Row first = rows.get(0); + String ago = TimeUtils.printSince(first.timestamp); + Row last = rows.get(rows.size() - 1); + String status = last.failed ? "failed" : "success"; + String s = String.format("Message History of last completed (id:%s status:%s ago:%s pid:%d name:%s)", + first.exchangeId, status, ago, first.pid, first.name); + printer().println(s); + + printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( + new Column().header("").dataAlign(HorizontalAlign.LEFT) + .minWidth(6).maxWidth(6) + .with(this::getDirection), + new Column().header("ID").dataAlign(HorizontalAlign.LEFT) + .minWidth(10).maxWidth(20, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(this::getId), + new Column().header("PROCESSOR").dataAlign(HorizontalAlign.LEFT) + .minWidth(40).maxWidth(55, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(this::getProcessor), + new Column().header("ELAPSED").dataAlign(HorizontalAlign.RIGHT) + .maxWidth(10, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(r -> "" + r.elapsed), + new Column().header("").dataAlign(HorizontalAlign.LEFT) + .maxWidth(60, OverflowBehaviour.NEWLINE) + .with(this::getMessage)))); + + JsonObject cause = last.exception; + if (cause != null) { + String st = cause.getString("stackTrace"); + if (st != null) { + st = Jsoner.unescape(st); + } + if (st != null) { + printer().println(); + printer().println("Stacktrace"); + printer().println("-".repeat(80)); + String text = Ansi.ansi().fgRed().a(st).reset().toString(); + printer().println(text); + printer().println(); + } + } + printer().println(); + } + } + + return 0; + } + + private String getId(Row r) { + String answer; + if (source && r.location != null) { + answer = r.location; + } else { + if (r.nodeId == null) { + answer = r.routeId; + } else { + answer = r.nodeId; + } + } + return answer; + } + + private String getProcessor(Row r) { + if (r.first || r.last) { + return "from[" + r.endpoint.getString("endpoint") + "]"; + } else if (r.nodeLabel != null) { + return r.nodeLabel; + } else { + return r.nodeId; + } + } + + private String getDirection(Row r) { + if (r.first) { + return "*-->"; + } else if (r.last) { + return "*<--"; + } else { + return ""; + } + } + + private String getMessage(Row r) { + if (r.failed && !r.last) { + return "Exception: " + r.exception.getString("message"); + } + if (r.last) { + return r.failed ? "failed" : "success"; + } + return null; + } + + protected List<List<Row>> loadRows() throws Exception { + List<List<Row>> answer = new ArrayList<>(); + List<Long> pids = findPids(name); + for (long pid : pids) { + Path p = getMessageHistoryFile(Long.toString(pid)); + if (Files.exists(p)) { + String line; + LineNumberReader reader = new LineNumberReader(Files.newBufferedReader(p)); + try { + List<Row> rows = new ArrayList<>(); + do { + line = reader.readLine(); + List<Row> load = parseTraceLine(pid, line); + if (load != null) { + rows.addAll(load); + } + } while (line != null); + if (!rows.isEmpty()) { + answer.add(rows); + } + } catch (Exception e) { + // ignore + } finally { + IOHelper.close(reader); + } + } + } + return answer; + } + + private List<Row> parseTraceLine(long pid, String line) { + JsonObject root = null; + try { + root = (JsonObject) Jsoner.deserialize(line); + } catch (Exception e) { + // ignore + } + if (root != null) { + List<Row> rows = new ArrayList<>(); + String name = root.getString("name"); + JsonArray arr = root.getCollection("traces"); + if (arr != null) { + for (Object o : arr) { + Row row = new Row(); + row.pid = pid; + row.name = name; + JsonObject jo = (JsonObject) o; + row.uid = jo.getLong("uid"); + row.first = jo.getBoolean("first"); + row.last = jo.getBoolean("last"); + row.location = jo.getString("location"); + row.routeId = jo.getString("routeId"); + row.nodeId = jo.getString("nodeId"); + row.nodeShortName = jo.getString("nodeShortName"); + row.nodeLabel = jo.getString("nodeLabel"); + if (mask) { + row.nodeLabel = URISupport.sanitizeUri(row.nodeLabel); + } + String uri = jo.getString("endpointUri"); + if (uri != null) { + row.endpoint = new JsonObject(); + if (mask) { + uri = URISupport.sanitizeUri(uri); + } + row.endpoint.put("endpoint", uri); + row.endpoint.put("remote", jo.getBooleanOrDefault("remoteEndpoint", true)); + } + JsonObject es = jo.getMap("endpointService"); + if (es != null) { + row.endpointService = es; + } + Long ts = jo.getLong("timestamp"); + if (ts != null) { + row.timestamp = ts; + } + row.elapsed = jo.getLong("elapsed"); + row.failed = jo.getBoolean("failed"); + row.done = jo.getBoolean("done"); + row.threadName = jo.getString("threadName"); + row.message = jo.getMap("message"); + row.exception = jo.getMap("exception"); + row.exchangeId = row.message.getString("exchangeId"); + row.exchangePattern = row.message.getString("exchangePattern"); + // we should exchangeId/pattern elsewhere + row.message.remove("exchangeId"); + row.message.remove("exchangePattern"); + rows.add(row); + } + } + return rows; + } + return null; + } + + private static class Row { + long pid; + String name; + boolean first; + boolean last; + long uid; + String exchangeId; + String exchangePattern; + String threadName; + String location; + String routeId; + String nodeId; + String nodeShortName; + String nodeLabel; + long timestamp; + long elapsed; + boolean done; + boolean failed; + JsonObject endpoint; + JsonObject endpointService; + JsonObject message; + JsonObject exception; + } + +}
