This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7512a95f54d8467fe7848939040c64cc8c249d66 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Feb 15 08:19:45 2023 +0100 CAMEL-19033: camel-jbang - get trace -> trace --- .../camel/spi/BacklogTracerEventMessage.java | 5 + .../camel/impl/debugger/BacklogDebugger.java | 10 +- .../debugger/DefaultBacklogTracerEventMessage.java | 16 +- .../camel/impl/engine/CamelInternalProcessor.java | 8 +- .../apache/camel/impl/console/TraceDevConsole.java | 5 +- .../main/java/org/apache/camel/util/IOHelper.java | 13 + .../camel/cli/connector/LocalCliConnector.java | 27 +- .../dsl/jbang/core/commands/CamelJBangMain.java | 4 +- .../core/commands/action/CamelTraceAction.java | 617 +++++++++++++++++++++ .../dsl/jbang/core/commands/process/ListTrace.java | 358 ------------ 10 files changed, 681 insertions(+), 382 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java index 95e7da612c7..d13dda1aac4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BacklogTracerEventMessage.java @@ -46,6 +46,11 @@ public interface BacklogTracerEventMessage { */ long getTimestamp(); + /** + * The location of the trace (source code name:line) if possible. + */ + String getLocation(); + /** * Route id */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java index cb1e41fc048..b67edf2ee6c 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java @@ -42,6 +42,7 @@ import org.apache.camel.spi.Condition; import org.apache.camel.spi.Debugger; import org.apache.camel.support.BreakpointSupport; import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.support.LoggerHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.support.service.ServiceSupport; @@ -608,7 +609,8 @@ public final class BacklogDebugger extends ServiceSupport { suspendedBreakpointMessages.computeIfPresent( nodeId, (nId, message) -> new DefaultBacklogTracerEventMessage( - false, false, message.getUid(), message.getTimestamp(), message.getRouteId(), message.getToNode(), + false, false, message.getUid(), message.getTimestamp(), message.getLocation(), message.getRouteId(), + message.getToNode(), message.getExchangeId(), dumpAsXml(suspendedExchange.getExchange()), dumpAsJSon(suspendedExchange.getExchange()))); @@ -667,10 +669,11 @@ public final class BacklogDebugger extends ServiceSupport { String messageAsXml = dumpAsXml(exchange); String messageAsJSon = dumpAsJSon(exchange); long uid = debugCounter.incrementAndGet(); + String source = LoggerHelper.getLineNumberLoggerName(definition); BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( - false, false, uid, timestamp, routeId, toNode, exchangeId, messageAsXml, messageAsJSon); + false, false, uid, timestamp, source, routeId, toNode, exchangeId, messageAsXml, messageAsJSon); suspendedBreakpointMessages.put(nodeId, msg); // suspend at this breakpoint @@ -735,10 +738,11 @@ public final class BacklogDebugger extends ServiceSupport { String messageAsXml = dumpAsXml(exchange); String messageAsJSon = dumpAsJSon(exchange); long uid = debugCounter.incrementAndGet(); + String source = LoggerHelper.getLineNumberLoggerName(definition); BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage( - false, false, uid, timestamp, routeId, toNode, exchangeId, messageAsXml, messageAsJSon); + false, false, uid, timestamp, source, routeId, toNode, exchangeId, messageAsXml, messageAsJSon); suspendedBreakpointMessages.put(toNode, msg); // suspend at this breakpoint diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java index 8ff42681e28..ae9b44fbefb 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogTracerEventMessage.java @@ -35,6 +35,7 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private final boolean last; private final long uid; private final long timestamp; + private String location; private final String routeId; private final String toNode; private final String exchangeId; @@ -44,16 +45,16 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven private String exceptionAsJSon; private long duration; private boolean done; - private boolean failed; public DefaultBacklogTracerEventMessage(boolean first, boolean last, long uid, long timestamp, - String routeId, String toNode, String exchangeId, + String location, String routeId, String toNode, String exchangeId, String messageAsXml, String messageAsJSon) { this.watch = new StopWatch(); this.first = first; this.last = last; this.uid = uid; this.timestamp = timestamp; + this.location = location; this.routeId = routeId; this.toNode = toNode; this.exchangeId = exchangeId; @@ -89,6 +90,11 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven return timestamp; } + @Override + public String getLocation() { + return location; + } + @Override public String getRouteId() { return routeId; @@ -184,6 +190,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven sb.append(prefix).append(" <elapsed>").append(getElapsed()).append("</elapsed>\n"); sb.append(prefix).append(" <done>").append(isDone()).append("</done>\n"); sb.append(prefix).append(" <failed>").append(isFailed()).append("</failed>\n"); + if (getLocation() != null) { + sb.append(prefix).append(" <location>").append(getLocation()).append("</location>\n"); + } // route id is optional and we then use an empty value for no route id sb.append(prefix).append(" <routeId>").append(routeId != null ? routeId : "").append("</routeId>\n"); if (toNode != null) { @@ -217,6 +226,9 @@ public final class DefaultBacklogTracerEventMessage implements BacklogTracerEven jo.put("uid", uid); jo.put("first", first); jo.put("last", last); + if (location != null) { + jo.put("location", location); + } if (routeId != null) { jo.put("routeId", routeId); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 48bf5702c20..6fc1722509d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -594,10 +594,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) String routeId = routeDefinition != null ? routeDefinition.getRouteId() : null; + String source = LoggerHelper.getLineNumberLoggerName(processorDefinition); if (first) { long created = exchange.getCreated(); DefaultBacklogTracerEventMessage pseudoFirst = new DefaultBacklogTracerEventMessage( - true, false, backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, + true, false, backlogTracer.incrementTraceCounter(), created, source, routeId, null, exchangeId, messageAsXml, messageAsJSon); backlogTracer.traceEvent(pseudoFirst); @@ -617,7 +618,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In true, backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars(), true); DefaultBacklogTracerEventMessage pseudoLast = new DefaultBacklogTracerEventMessage( - false, true, backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, + false, true, backlogTracer.incrementTraceCounter(), created, source, routeId, null, + exchangeId, messageAsXml, messageAsJSon); backlogTracer.traceEvent(pseudoLast); @@ -630,7 +632,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In }); } DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( - false, false, backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, + false, false, backlogTracer.incrementTraceCounter(), timestamp, source, routeId, toNode, exchangeId, messageAsXml, messageAsJSon); backlogTracer.traceEvent(event); diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/TraceDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/TraceDevConsole.java index 77dd5f02e40..b4019cefb15 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/TraceDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/TraceDevConsole.java @@ -16,8 +16,6 @@ */ package org.apache.camel.impl.console; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; @@ -28,6 +26,7 @@ import org.apache.camel.spi.Configurer; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.DevConsole; import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; @DevConsole("trace") @@ -97,7 +96,7 @@ public class TraceDevConsole extends AbstractDevConsole { addMessage(t); } - List<JsonObject> arr = new ArrayList<>(); + JsonArray arr = new JsonArray(); root.put("traces", arr); for (BacklogTracerEventMessage t : queue) { JsonObject jo = (JsonObject) t.asJSon(); diff --git a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java index b6d9b8f9e6d..ba1cfffcb9a 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/IOHelper.java @@ -472,6 +472,19 @@ public final class IOHelper { } } + /** + * Appends the text to the file. + */ + public static void appendText(String text, File file) throws IOException { + if (!file.exists()) { + String path = FileUtil.onlyPath(file.getPath()); + if (path != null) { + new File(path).mkdirs(); + } + } + writeText(text, new FileOutputStream(file, true)); + } + /** * Writes the text to the file. */ diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index 75c19f72ad0..a7a9af67fa2 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -53,6 +53,7 @@ import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; import org.apache.camel.util.concurrent.ThreadHelper; +import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; import org.apache.camel.util.json.Jsoner; import org.slf4j.Logger; @@ -79,6 +80,7 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C private File actionFile; private File outputFile; private File traceFile; + private long traceFilePos; // keep track of trace offset public LocalCliConnector(CliConnectorFactory cliConnectorFactory) { this.cliConnectorFactory = cliConnectorFactory; @@ -288,13 +290,6 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C LOG.trace("Updating output file: {}", outputFile); IOHelper.writeText(json.toJson(), outputFile); } - } else if ("trace".equals(action)) { - DevConsole dc = camelContext.getExtension(DevConsoleRegistry.class).resolveById("trace"); - if (dc != null) { - JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON); - LOG.trace("Updating trace file: {}", traceFile); - IOHelper.writeText(json.toJson(), traceFile); - } } // action done so delete file @@ -427,13 +422,23 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C root.put("fault-tolerance", json); } } - DevConsole dc12 = dcr.resolveById("trace"); + DevConsole dc12 = camelContext.getExtension(DevConsoleRegistry.class).resolveById("trace"); if (dc12 != null) { JsonObject json = (JsonObject) dc12.call(DevConsole.MediaType.JSON); - if (json != null && !json.isEmpty()) { - // special for trace messages which is stored in its own file + JsonArray arr = json.getCollection("traces"); + // filter based on last uid + if (traceFilePos > 0) { + arr.removeIf(r -> { + JsonObject jo = (JsonObject) r; + return jo.getLong("uid") <= traceFilePos; + }); + } + if (!arr.isEmpty()) { + // store traces in a special file LOG.trace("Updating trace file: {}", traceFile); - IOHelper.writeText(json.toJson(), traceFile); + IOHelper.appendText(json.toJson(), traceFile); + json = arr.getMap(arr.size() - 1); + traceFilePos = json.getLong("uid"); } } } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index d28dc582c79..96088a9b3a3 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -30,6 +30,7 @@ import org.apache.camel.dsl.jbang.core.commands.action.CamelRouteStopAction; import org.apache.camel.dsl.jbang.core.commands.action.CamelSourceAction; import org.apache.camel.dsl.jbang.core.commands.action.CamelSourceTop; import org.apache.camel.dsl.jbang.core.commands.action.CamelThreadDump; +import org.apache.camel.dsl.jbang.core.commands.action.CamelTraceAction; import org.apache.camel.dsl.jbang.core.commands.action.LoggerAction; import org.apache.camel.dsl.jbang.core.commands.action.RouteControllerAction; import org.apache.camel.dsl.jbang.core.commands.catalog.CatalogCommand; @@ -59,7 +60,6 @@ import org.apache.camel.dsl.jbang.core.commands.process.ListInflight; import org.apache.camel.dsl.jbang.core.commands.process.ListMetric; import org.apache.camel.dsl.jbang.core.commands.process.ListProcess; import org.apache.camel.dsl.jbang.core.commands.process.ListService; -import org.apache.camel.dsl.jbang.core.commands.process.ListTrace; import org.apache.camel.dsl.jbang.core.commands.process.ListVault; import org.apache.camel.dsl.jbang.core.commands.process.StopProcess; import picocli.CommandLine; @@ -77,6 +77,7 @@ public class CamelJBangMain implements Callable<Integer> { .addSubcommand("log", new CommandLine(new CamelLogAction(main))) .addSubcommand("ps", new CommandLine(new ListProcess(main))) .addSubcommand("stop", new CommandLine(new StopProcess(main))) + .addSubcommand("trace", new CommandLine(new CamelTraceAction(main))) .addSubcommand("get", new CommandLine(new CamelStatus(main)) .addSubcommand("context", new CommandLine(new CamelContextStatus(main))) .addSubcommand("route", new CommandLine(new CamelRouteStatus(main))) @@ -85,7 +86,6 @@ public class CamelJBangMain implements Callable<Integer> { .addSubcommand("health", new CommandLine(new ListHealth(main))) .addSubcommand("endpoint", new CommandLine(new ListEndpoint(main))) .addSubcommand("event", new CommandLine(new ListEvent(main))) - .addSubcommand("trace", new CommandLine(new ListTrace(main))) .addSubcommand("inflight", new CommandLine(new ListInflight(main))) .addSubcommand("blocked", new CommandLine(new ListBlocked(main))) .addSubcommand("route-controller", new CommandLine(new RouteControllerAction(main))) diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java new file mode 100644 index 00000000000..da874e9dcec --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelTraceAction.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.action; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.LineNumberReader; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.regex.Pattern; + +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.common.JSonHelper; +import org.apache.camel.dsl.jbang.core.common.ProcessHelper; +import org.apache.camel.util.StopWatch; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; +import org.apache.camel.util.json.Jsoner; +import org.fusesource.jansi.Ansi; +import org.fusesource.jansi.AnsiConsole; +import picocli.CommandLine; + [email protected](name = "trace", + description = "Tail message traces from running Camel integrations") +public class CamelTraceAction extends ActionBaseCommand { + + // TODO: message dump in json or not (option) + // TODO: since + + private static final int NAME_MAX_WIDTH = 25; + private static final int NAME_MIN_WIDTH = 10; + + @CommandLine.Parameters(description = "Name or pid of running Camel integration. (default selects all)", arity = "0..1") + String name = "*"; + + @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", description = "Use colored logging") + boolean loggingColor = true; + + @CommandLine.Option(names = { "--timestamp" }, defaultValue = "true", + description = "Print timestamp.") + boolean timestamp = true; + + @CommandLine.Option(names = { "--pretty" }, defaultValue = "false", + description = "Pretty print traced message") + boolean pretty; + + @CommandLine.Option(names = { "--follow" }, defaultValue = "true", + description = "Keep following and outputting new log lines (use ctrl + c to exit).") + boolean follow = true; + + @CommandLine.Option(names = { "--prefix" }, defaultValue = "true", + description = "Print prefix with running Camel integration name.") + boolean prefix = true; + + @CommandLine.Option(names = { "--tail" }, + description = "The number of traces from the end of the trace to show. Defaults to showing all traces.") + int tail; + + @CommandLine.Option(names = { "--since" }, + description = "Return traces newer than a relative duration like 5s, 2m, or 1h. The value is in seconds if no unit specified.") + String since; + + @CommandLine.Option(names = { "--find" }, + description = "Find and highlight matching text (ignore case).", arity = "0..*") + String[] find; + + @CommandLine.Option(names = { "--grep" }, + description = "Filter traces to only output trace matching text (ignore case).", arity = "0..*") + String[] grep; + + @CommandLine.Option(names = { "--show-exchange-properties", "showExchangeProperties" }, defaultValue = "false", + description = "Show exchange properties in traced messages") + boolean showExchangeProperties; + + @CommandLine.Option(names = { "--show-message-headers", "showMessageHeaders" }, defaultValue = "true", + description = "Show message headers in traced messages") + boolean showMessageHeaders = true; + + @CommandLine.Option(names = { "--show-message-body", "showMessageBody" }, defaultValue = "true", + description = "Show message body in traced messages") + boolean showMessageBody = true; + + @CommandLine.Option(names = { "--show-exception", "showException" }, defaultValue = "true", + description = "Show exception and stacktrace for failed messages") + boolean showException = true; + + String findAnsi; + + private int nameMaxWidth; + + private final Map<String, Ansi.Color> colors = new HashMap<>(); + + public CamelTraceAction(CamelJBangMain main) { + super(main); + } + + @Override + public Integer call() throws Exception { + Map<Long, Pid> pids = new LinkedHashMap<>(); + + // find new pids + updatePids(pids); + if (!pids.isEmpty()) { + // read existing trace files (skip by tail) + if (find != null) { + findAnsi = Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString(); + for (int i = 0; i < find.length; i++) { + String f = find[i]; + f = Pattern.quote(f); + find[i] = f; + } + } + if (grep != null) { + findAnsi = Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString(); + for (int i = 0; i < grep.length; i++) { + String f = grep[i]; + f = Pattern.quote(f); + grep[i] = f; + } + } + // dump existing traces + tailTraceFiles(pids, tail); + dumpTraceFiles(pids, tail); + } + + if (follow) { + boolean waitMessage = true; + StopWatch watch = new StopWatch(); + do { + if (pids.isEmpty()) { + if (waitMessage) { + System.out.println("Waiting for traces ..."); + waitMessage = false; + } + Thread.sleep(500); + updatePids(pids); + } else { + waitMessage = true; + if (watch.taken() > 500) { + // check for new traces + updatePids(pids); + watch.restart(); + } + int lines = readTraceFiles(pids); + if (lines > 0) { + dumpTraceFiles(pids, 0); + } else { + Thread.sleep(100); + } + } + } while (true); + } + + return 0; + } + + private void tailTraceFiles(Map<Long, Pid> pids, int tail) throws Exception { + for (Pid pid : pids.values()) { + File log = getTraceFile(pid.pid); + if (log.exists()) { + pid.reader = new LineNumberReader(new FileReader(log)); + String line; + if (tail == 0) { + pid.fifo = new ArrayDeque<>(); + } else { + pid.fifo = new ArrayBlockingQueue<>(tail); + } + do { + line = pid.reader.readLine(); + if (line != null) { + while (!pid.fifo.offer(line)) { + pid.fifo.poll(); + } + } + } while (line != null); + } + } + } + + private void updatePids(Map<Long, Pid> rows) { + List<Long> pids = findPids(name); + ProcessHandle.allProcesses() + .filter(ph -> pids.contains(ph.pid())) + .forEach(ph -> { + JsonObject root = loadStatus(ph.pid()); + if (root != null) { + Pid row = new Pid(); + row.pid = "" + ph.pid(); + JsonObject context = (JsonObject) root.get("context"); + if (context == null) { + return; + } + row.name = context.getString("name"); + if ("CamelJBang".equals(row.name)) { + row.name = ProcessHelper.extractName(root, ph); + } + int len = row.name.length(); + if (len < NAME_MIN_WIDTH) { + len = NAME_MIN_WIDTH; + } + if (len > NAME_MAX_WIDTH) { + len = NAME_MAX_WIDTH; + } + if (len > nameMaxWidth) { + nameMaxWidth = len; + } + if (!rows.containsKey(ph.pid())) { + rows.put(ph.pid(), row); + } + } + }); + + // remove pids that are no long active from the rows + Set<Long> remove = new HashSet<>(); + for (long pid : rows.keySet()) { + if (!pids.contains(pid)) { + remove.add(pid); + } + } + for (long pid : remove) { + rows.remove(pid); + } + } + + private int readTraceFiles(Map<Long, Pid> pids) throws Exception { + int lines = 0; + + for (Pid pid : pids.values()) { + if (pid.reader == null) { + File file = getTraceFile(pid.pid); + if (file.exists()) { + pid.reader = new LineNumberReader(new FileReader(file)); + } + } + if (pid.reader != null) { + String line; + do { + try { + line = pid.reader.readLine(); + if (line != null) { + lines++; + // switch fifo to be unlimited as we use it for new log lines + if (pid.fifo == null || pid.fifo instanceof ArrayBlockingQueue) { + pid.fifo = new ArrayDeque<>(); + } + pid.fifo.offer(line); + } + } catch (IOException e) { + // ignore + line = null; + } + } while (line != null); + } + } + + return lines; + } + + private List<Row> parseTraceLine(Pid pid, String line) { + JsonObject root = null; + try { + root = (JsonObject) Jsoner.deserialize(line); + } catch (Exception e) { + // ignore + } + if (root != null) { + List<Row> rows = new ArrayList<>(); + JsonArray arr = root.getCollection("traces"); + if (arr != null) { + for (Object o : arr) { + Row row = new Row(); + row.pid = pid.pid; + row.name = pid.name; + JsonObject jo = (JsonObject) o; + row.uid = jo.getLong("uid"); + row.first = jo.getBoolean("first"); + row.last = jo.getBoolean("last"); + row.location = jo.getString("location"); + row.routeId = jo.getString("routeId"); + row.nodeId = jo.getString("nodeId"); + Long ts = jo.getLong("timestamp"); + if (ts != null) { + row.timestamp = ts; + } + row.elapsed = jo.getLong("elapsed"); + row.failed = jo.getBoolean("failed"); + row.done = jo.getBoolean("done"); + row.message = jo.getMap("message"); + row.exception = jo.getMap("exception"); + row.exchangeId = row.message.getString("exchangeId"); + row.message.remove("exchangeId"); + if (!showExchangeProperties) { + row.message.remove("exchangeProperties"); + } + if (!showMessageHeaders) { + row.message.remove("headers"); + } + if (!showMessageBody) { + row.message.remove("body"); + } + if (!showException) { + row.exception = null; + } + rows.add(row); + } + } + return rows; + } + return null; + } + + private void dumpTraceFiles(Map<Long, Pid> pids, int tail) { + Set<String> names = new HashSet<>(); + List<Row> rows = new ArrayList<>(); + for (Pid pid : pids.values()) { + Queue<String> queue = pid.fifo; + if (queue != null) { + for (String l : queue) { + names.add(pid.name); + List<Row> parsed = parseTraceLine(pid, l); + if (parsed != null && !parsed.isEmpty()) { + rows.addAll(parsed); + } + } + pid.fifo.clear(); + } + } + + // only sort if there are multiple Camels running + if (names.size() > 1) { + // sort lines + final Map<String, Long> lastTimestamp = new HashMap<>(); + rows.sort((r1, r2) -> { + long t1 = r1.timestamp; + long t2 = r2.timestamp; + if (t1 == 0) { + t1 = lastTimestamp.get(r1.name); + } + if (t1 == 0) { + t1 = lastTimestamp.get(r2.name); + } + if (t1 == 0 && t2 == 0) { + return 0; + } else if (t1 == 0) { + return -1; + } else if (t2 == 0) { + return 1; + } + lastTimestamp.put(r1.name, t1); + lastTimestamp.put(r2.name, t2); + return Long.compare(t1, t2); + }); + } + if (tail > 0) { + // cut according to tail + int pos = rows.size() - tail; + if (pos > 0) { + rows = rows.subList(pos, rows.size()); + } + } + + rows.forEach(r -> { + printTrace(r.name, r); + }); + } + + private boolean isValidGrep(String line) { + if (grep == null) { + return true; + } + for (String g : grep) { + boolean m = Pattern.compile("(?i)" + g).matcher(line).find(); + if (m) { + return true; + } + } + return false; + } + + protected void printTrace(String name, Row row) { + if (!prefix) { + name = null; + } + + String json = getDataAsJSon(row); + boolean valid = isValidGrep(json); + if (!valid) { + return; + } + + if (name != null) { + if (loggingColor) { + Ansi.Color color = colors.get(name); + if (color == null) { + // grab a new color + int idx = (colors.size() % 6) + 1; + color = Ansi.Color.values()[idx]; + colors.put(name, color); + } + String n = String.format("%-" + nameMaxWidth + "s", name); + AnsiConsole.out().print(Ansi.ansi().fg(color).a(n).a("| ").reset()); + } else { + String n = String.format("%-" + nameMaxWidth + "s", name); + System.out.print(n); + System.out.print("| "); + } + } + if (timestamp) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String ts = sdf.format(new Date(row.timestamp)); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(ts).reset()); + } else { + System.out.print(ts); + } + System.out.print(" "); + } + // pid + String p = String.format("%5.5s", row.pid); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgMagenta().a(p).reset()); + AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(" --- ").reset()); + } else { + System.out.print(p); + System.out.print(" --- "); + } + // route/node id + String ids = String.format("[%25.25s]", row.routeId + "/" + getId(row)); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(ids).reset()); + } else { + System.out.print(ids); + } + System.out.print(" "); + // source location + String code = String.format("%-25.25s", row.location != null ? row.location : ""); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgCyan().a(code).reset()); + AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(" : ").reset()); + } else { + System.out.print(code); + System.out.print(" : "); + } + // uuid + String u = String.format("%5.5s", row.uid); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgMagenta().a(u).reset()); + } else { + System.out.print(u); + } + System.out.print(" - "); + // status + System.out.print(getStatus(row)); + // elapsed + String e = getElapsed(row); + if (e != null) { + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgBrightDefault().a(" (" + e + ")").reset()); + } else { + System.out.print("(" + e + ")"); + } + } + // trace message as json + String[] lines = json.split(System.lineSeparator()); + if (lines.length > 0) { + System.out.println(); + for (String line : lines) { + if (find != null) { + for (String f : find) { + line = line.replaceAll("(?i)" + f, findAnsi); + } + } + if (grep != null) { + for (String g : grep) { + line = line.replaceAll("(?i)" + g, findAnsi); + } + } + System.out.println(line); + } + System.out.println(); + } + } + + private String getDataAsJSon(Row r) { + String s = r.message.toJson(); + if (loggingColor) { + s = JSonHelper.colorPrint(s, 2, pretty); + } else if (pretty) { + s = JSonHelper.prettyPrint(s, 2); + } + String st = null; + if (r.exception != null) { + // include stacktrace + st = Jsoner.unescape(r.exception.getString("stackTrace")); + if (loggingColor) { + st = Ansi.ansi().fg(Ansi.Color.RED).bold().a(st).reset().toString(); + } + } + if (st != null) { + return s + System.lineSeparator() + st; + } else { + return s; + } + } + + private String getElapsed(Row r) { + if (!r.first) { + return TimeUtils.printDuration(r.elapsed, true); + } + return null; + } + + private String getStatus(Row r) { + if (r.first) { + if (loggingColor) { + return Ansi.ansi().fg(Ansi.Color.GREEN).a("Input").reset().toString(); + } else { + return "Input"; + } + } else if (r.last) { + if (loggingColor) { + return Ansi.ansi().fg(r.failed ? Ansi.Color.RED : Ansi.Color.GREEN).a("Output").reset().toString(); + } else { + return "Output"; + } + } + if (!r.done) { + if (loggingColor) { + return Ansi.ansi().fg(Ansi.Color.BLUE).a("Processing").reset().toString(); + } else { + return "Processing"; + } + } else if (r.failed) { + if (loggingColor) { + return Ansi.ansi().fg(Ansi.Color.RED).a("Failed").reset().toString(); + } else { + return "Failed"; + } + } else { + if (loggingColor) { + return Ansi.ansi().fg(Ansi.Color.GREEN).a("Success").reset().toString(); + } else { + return "Success"; + } + } + } + + private String getId(Row r) { + if (r.first) { + return "*-->"; + } else if (r.last) { + return "*<--"; + } else { + return "" + r.nodeId; + } + } + + private static class Pid implements Cloneable { + String pid; + String name; + Queue<String> fifo; + LineNumberReader reader; + } + + private static class Row implements Cloneable { + String pid; + String name; + boolean first; + boolean last; + long uid; + String exchangeId; + String location; + String routeId; + String nodeId; + long timestamp; + long elapsed; + boolean done; + boolean failed; + JsonObject message; + JsonObject exception; + + Row copy() { + try { + return (Row) clone(); + } catch (CloneNotSupportedException e) { + return null; + } + } + } + +} diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListTrace.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListTrace.java deleted file mode 100644 index eb28e878adf..00000000000 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListTrace.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.dsl.jbang.core.commands.process; - -import java.io.File; -import java.io.FileInputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -import com.github.freva.asciitable.AsciiTable; -import com.github.freva.asciitable.Column; -import com.github.freva.asciitable.HorizontalAlign; -import com.github.freva.asciitable.OverflowBehaviour; -import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; -import org.apache.camel.dsl.jbang.core.common.JSonHelper; -import org.apache.camel.dsl.jbang.core.common.ProcessHelper; -import org.apache.camel.util.IOHelper; -import org.apache.camel.util.TimeUtils; -import org.apache.camel.util.json.JsonArray; -import org.apache.camel.util.json.JsonObject; -import org.apache.camel.util.json.Jsoner; -import org.fusesource.jansi.Ansi; -import picocli.CommandLine; -import picocli.CommandLine.Command; - -@Command(name = "trace", - description = "Get latest traced messages of Camel integrations") -public class ListTrace extends ProcessWatchCommand { - - @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") - String name = "*"; - - @CommandLine.Option(names = { "--sort" }, - description = "Sort by pid, name or age", defaultValue = "pid") - String sort; - - @CommandLine.Option(names = { "--latest" }, defaultValue = "false", - description = "Only output traces of latest message") - boolean latest; - - @CommandLine.Option(names = { "--brief" }, defaultValue = "false", - description = "Brief mode to only show traces of input and output (no intermediate processing steps)") - boolean brief; - - @CommandLine.Option(names = { "--pretty" }, defaultValue = "false", - description = "Pretty print traced message") - boolean pretty; - - @CommandLine.Option(names = { "--show-exchange-properties", "showExchangeProperties" }, defaultValue = "false", - description = "Show exchange properties in traced messages") - boolean showExchangeProperties; - - @CommandLine.Option(names = { "--show-message-headers", "showMessageHeaders" }, defaultValue = "true", - description = "Show message headers in traced messages") - boolean showMessageHeaders = true; - - @CommandLine.Option(names = { "--show-message-body", "showMessageBody" }, defaultValue = "true", - description = "Show message body in traced messages") - boolean showMessageBody = true; - - @CommandLine.Option(names = { "--show-exception", "showException" }, defaultValue = "true", - description = "Show exception and stacktrace for failed messages") - boolean showException = true; - - @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", description = "Use colored logging") - boolean loggingColor = true; - - public ListTrace(CamelJBangMain main) { - super(main); - } - - @Override - public Integer doCall() throws Exception { - List<Row> rows = new ArrayList<>(); - - List<Long> pids = findPids(name); - ProcessHandle.allProcesses() - .filter(ph -> pids.contains(ph.pid())) - .forEach(ph -> { - JsonObject root = loadStatus(ph.pid()); - // there must be a status file for the running Camel integration - if (root != null) { - Row row = new Row(); - JsonObject context = (JsonObject) root.get("context"); - if (context == null) { - return; - } - row.name = context.getString("name"); - if ("CamelJBang".equals(row.name)) { - row.name = ProcessHelper.extractName(root, ph); - } - row.pid = "" + ph.pid(); - fetchTraces(row, rows); - } - }); - - // sort rows - rows.sort(this::sortRow); - - if (!rows.isEmpty()) { - String data = AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( - new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), - new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) - .with(r -> r.name), - new Column().header("EXCHANGE-ID").dataAlign(HorizontalAlign.LEFT).with(r -> r.exchangeId), - new Column().header("UID").headerAlign(HorizontalAlign.CENTER).dataAlign(HorizontalAlign.RIGHT) - .maxWidth(6).with(this::getUid), - new Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT) - .with(r -> r.routeId), - new Column().header("ID").dataAlign(HorizontalAlign.LEFT).maxWidth(25, OverflowBehaviour.ELLIPSIS_RIGHT) - .with(this::getId), - new Column().header("AGE").dataAlign(HorizontalAlign.RIGHT).with(this::getTimestamp), - new Column().header("ELAPSED").dataAlign(HorizontalAlign.RIGHT).with(this::getElapsed), - new Column().header("STATUS").dataAlign(HorizontalAlign.LEFT).with(this::getStatus))); - - String[] arr = data.split(System.lineSeparator()); - String header = arr[0]; - if (loggingColor) { - header = Ansi.ansi().bgDefault().bold().a(header).reset().toString(); - } - System.out.println(header); - // mix column and message (master/detail) mode - for (int i = 0; i < rows.size(); i++) { - String s = arr[i + 1]; - if (i > 0 && pretty) { - // print header per trace in pretty mode - System.out.println(header); - } - System.out.println(s); - String json = getDataAsJSon(rows.get(i)); - // pad with 8 spaces to indent json data - String[] lines = json.split(System.lineSeparator()); - for (String line : lines) { - System.out.print(" "); - System.out.println(line); - } - } - } - - return 0; - } - - private void fetchTraces(Row row, List<Row> rows) { - // load trace file if exists - JsonObject root = loadTrace(row.pid); - String lastId = null; - if (root != null) { - List<Row> local = new ArrayList<>(); - JsonArray arr = root.getCollection("traces"); - if (arr != null) { - for (Object o : arr) { - row = row.copy(); - JsonObject jo = (JsonObject) o; - row.uid = jo.getLong("uid"); - row.first = jo.getBoolean("first"); - row.last = jo.getBoolean("last"); - row.routeId = jo.getString("routeId"); - row.nodeId = jo.getString("nodeId"); - Long ts = jo.getLong("timestamp"); - if (ts != null) { - row.timestamp = ts; - } - row.elapsed = jo.getLong("elapsed"); - row.failed = jo.getBoolean("failed"); - row.done = jo.getBoolean("done"); - row.message = jo.getMap("message"); - row.exception = jo.getMap("exception"); - row.exchangeId = row.message.getString("exchangeId"); - row.message.remove("exchangeId"); - if (!showExchangeProperties) { - row.message.remove("exchangeProperties"); - } - if (!showMessageHeaders) { - row.message.remove("headers"); - } - if (!showMessageBody) { - row.message.remove("body"); - } - if (!showException) { - row.exception = null; - } - lastId = row.exchangeId; - local.add(row); - } - } - if (latest && lastId != null) { - // filter out all that does not match last exchange id - final String target = lastId; - local.removeIf(r -> !Objects.equals(target, r.exchangeId)); - } - if (brief) { - local.removeIf(r -> !r.first && !r.last); - } - rows.addAll(local); - } - } - - private JsonObject loadTrace(String pid) { - try { - File f = getTraceFile(pid); - if (f != null && f.exists()) { - FileInputStream fis = new FileInputStream(f); - String text = IOHelper.loadText(fis); - IOHelper.close(fis); - return (JsonObject) Jsoner.deserialize(text); - } - } catch (Throwable e) { - // ignore - } - return null; - } - - private String getTimestamp(Row r) { - if (r.timestamp > 0) { - return TimeUtils.printSince(r.timestamp); - } - return ""; - } - - private String getElapsed(Row r) { - if (!r.first) { - return TimeUtils.printDuration(r.elapsed, true); - } - return ""; - } - - private String getStatus(Row r) { - if (r.first) { - if (loggingColor) { - return Ansi.ansi().fg(Ansi.Color.GREEN).bold().a("Input").reset().toString(); - } else { - return "Input"; - } - } else if (r.last) { - if (loggingColor) { - return Ansi.ansi().fg(r.failed ? Ansi.Color.RED : Ansi.Color.GREEN).bold().a("Output").reset().toString(); - } else { - return "Output"; - } - } - if (!r.done) { - if (loggingColor) { - return Ansi.ansi().fg(Ansi.Color.BLUE).bold().a("Processing").reset().toString(); - } else { - return "Processing"; - } - } else if (r.failed) { - if (loggingColor) { - return Ansi.ansi().fg(Ansi.Color.RED).bold().a("Failed").reset().toString(); - } else { - return "Failed"; - } - } else { - if (loggingColor) { - return Ansi.ansi().fg(Ansi.Color.GREEN).bold().a("Success").reset().toString(); - } else { - return "Success"; - } - } - } - - private String getUid(Row r) { - return "" + r.uid; - } - - private String getId(Row r) { - if (r.first) { - return "*-->"; - } else if (r.last) { - return "*<--"; - } else { - return "" + r.nodeId; - } - } - - private String getDataAsJSon(Row r) { - String s = r.message.toJson(); - if (loggingColor) { - s = JSonHelper.colorPrint(s, 2, pretty); - } else if (pretty) { - s = JSonHelper.prettyPrint(s, 2); - } - String st = null; - if (r.exception != null) { - // include stacktrace - st = Jsoner.unescape(r.exception.getString("stackTrace")); - if (loggingColor) { - st = Ansi.ansi().fg(Ansi.Color.RED).bold().a(st).reset().toString(); - } - } - if (st != null) { - return s + System.lineSeparator() + st; - } else { - return s; - } - } - - protected int sortRow(Row o1, Row o2) { - String s = sort; - int negate = 1; - if (s.startsWith("-")) { - s = s.substring(1); - negate = -1; - } - switch (s) { - case "pid": - return Long.compare(Long.parseLong(o1.pid), Long.parseLong(o2.pid)) * negate; - case "name": - return o1.name.compareToIgnoreCase(o2.name) * negate; - case "age": - return Long.compare(o1.timestamp, o2.timestamp) * negate; - default: - return 0; - } - } - - private static class Row implements Cloneable { - String pid; - String name; - boolean first; - boolean last; - long uid; - String exchangeId; - String routeId; - String nodeId; - long timestamp; - long elapsed; - boolean done; - boolean failed; - JsonObject message; - JsonObject exception; - - Row copy() { - try { - return (Row) clone(); - } catch (CloneNotSupportedException e) { - return null; - } - } - } - -}
