This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch rcmd in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5126aad2a8c8446d00103d8e31bcdac058d71ce5 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Oct 9 16:26:45 2024 +0200 CAMEL-21193: camel-jbang - Add listen command --- .../impl/console/ReceiveDevConsoleConfigurer.java | 86 ++++++ .../org/apache/camel/dev-console/receive.json | 15 ++ ...org.apache.camel.impl.console.ReceiveDevConsole | 2 + .../services/org/apache/camel/dev-console/receive | 2 + .../org/apache/camel/dev-consoles.properties | 2 +- .../camel/impl/console/ReceiveDevConsole.java | 282 ++++++++++++++++++++ .../camel/cli/connector/LocalCliConnector.java | 57 +++- .../dsl/jbang/core/commands/CamelCommand.java | 4 + .../core/commands/action/CamelBrowseAction.java | 10 - .../core/commands/action/CamelReceiveAction.java | 296 +++++++++++++++++++++ 10 files changed, 743 insertions(+), 13 deletions(-) diff --git a/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java new file mode 100644 index 00000000000..0129a9fae6a --- /dev/null +++ b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java @@ -0,0 +1,86 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.impl.console; + +import javax.annotation.processing.Generated; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.spi.ConfigurerStrategy; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.impl.console.ReceiveDevConsole; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo") +@SuppressWarnings("unchecked") +public class ReceiveDevConsoleConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter { + + private static final Map<String, Object> ALL_OPTIONS; + static { + Map<String, Object> map = new CaseInsensitiveMap(); + map.put("BodyMaxChars", int.class); + map.put("CamelContext", org.apache.camel.CamelContext.class); + map.put("Capacity", int.class); + map.put("RemoveOnDump", boolean.class); + ALL_OPTIONS = map; + ConfigurerStrategy.addBootstrapConfigurerClearer(ReceiveDevConsoleConfigurer::clearBootstrapConfigurers); + } + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.impl.console.ReceiveDevConsole target = (org.apache.camel.impl.console.ReceiveDevConsole) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "bodymaxchars": + case "bodyMaxChars": target.setBodyMaxChars(property(camelContext, int.class, value)); return true; + case "camelcontext": + case "camelContext": target.setCamelContext(property(camelContext, org.apache.camel.CamelContext.class, value)); return true; + case "capacity": target.setCapacity(property(camelContext, int.class, value)); return true; + case "removeondump": + case "removeOnDump": target.setRemoveOnDump(property(camelContext, boolean.class, value)); return true; + default: return false; + } + } + + @Override + public Map<String, Object> getAllOptions(Object target) { + return ALL_OPTIONS; + } + + public static void clearBootstrapConfigurers() { + ALL_OPTIONS.clear(); + } + + @Override + public Class<?> getOptionType(String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "bodymaxchars": + case "bodyMaxChars": return int.class; + case "camelcontext": + case "camelContext": return org.apache.camel.CamelContext.class; + case "capacity": return int.class; + case "removeondump": + case "removeOnDump": return boolean.class; + default: return null; + } + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.impl.console.ReceiveDevConsole target = (org.apache.camel.impl.console.ReceiveDevConsole) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "bodymaxchars": + case "bodyMaxChars": return target.getBodyMaxChars(); + case "camelcontext": + case "camelContext": return target.getCamelContext(); + case "capacity": return target.getCapacity(); + case "removeondump": + case "removeOnDump": return target.isRemoveOnDump(); + default: return null; + } + } +} + diff --git a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json new file mode 100644 index 00000000000..978a1f23579 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "receive", + "title": "Camel Receive", + "description": "Consume messages from endpoints", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.ReceiveDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.9.0-SNAPSHOT" + } +} + diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole new file mode 100644 index 00000000000..3a11d5ff2c9 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.ReceiveDevConsoleConfigurer diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive new file mode 100644 index 00000000000..8454801f9f9 --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.ReceiveDevConsole diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties index fd1b9890684..8d3e19577ae 100644 --- a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties @@ -1,5 +1,5 @@ # Generated by camel build tools - do NOT edit this file! -dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties reload rest route route-controller route-dump service source startup-recorder thread top trace transformers type-converters variables +dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties receive reload rest route route-controller route-dump service source startup-recorder thread top trace transformers type-converters variables groupId=org.apache.camel artifactId=camel-console version=4.9.0-SNAPSHOT diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java new file mode 100644 index 00000000000..d1cd9b09c85 --- /dev/null +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.console; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Route; +import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.EndpointHelper; +import org.apache.camel.support.MessageHelper; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; + +@DevConsole(name = "receive", displayName = "Camel Receive", description = "Consume messages from endpoints") +@Configurer(bootstrap = true, extended = true) +public class ReceiveDevConsole extends AbstractDevConsole { + + @Metadata(defaultValue = "100", + description = "Maximum capacity of last number of messages to capture (capacity must be between 50 and 1000)") + private int capacity = 100; + @Metadata(defaultValue = "32768", label = "advanced", + description = "To limit the message body to a maximum size in the received message. Use 0 or negative value to use unlimited size.") + private int bodyMaxChars = 32 * 1024; + @Metadata(defaultValue = "true", label = "advanced", + description = "Whether all received messages should be removed when dumping. By default, the messages are removed, which means that dumping will not contain previous dumped messages.") + private boolean removeOnDump = true; + + /** + * Whether to enable or disable receive mode + */ + public static final String ENABLED = "enabled"; + + /** + * Whether to dump received messages + */ + public static final String DUMP = "dump"; + + /** + * Endpoint for where to receive messages (can also refer to a route id, endpoint pattern). + */ + public static final String ENDPOINT = "endpoint"; + + private final List<Consumer> consumers = new ArrayList<>(); + + private final AtomicBoolean enabled = new AtomicBoolean(); + private final AtomicLong uuid = new AtomicLong(); + private Queue<JsonObject> queue; + + public ReceiveDevConsole() { + super("camel", "receive", "Camel Receive", "Consume messages from endpoints"); + } + + public int getCapacity() { + return capacity; + } + + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + public int getBodyMaxChars() { + return bodyMaxChars; + } + + public void setBodyMaxChars(int bodyMaxChars) { + this.bodyMaxChars = bodyMaxChars; + } + + public boolean isRemoveOnDump() { + return removeOnDump; + } + + public void setRemoveOnDump(boolean removeOnDump) { + this.removeOnDump = removeOnDump; + } + + @Override + protected void doInit() throws Exception { + if (capacity > 1000 || capacity < 50) { + throw new IllegalArgumentException("Capacity must be between 50 and 1000"); + } + this.queue = new LinkedBlockingQueue<>(capacity); + } + + protected void stopConsumers() { + for (Consumer c : consumers) { + ServiceHelper.stopAndShutdownServices(c); + } + consumers.clear(); + } + + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + + String dump = (String) options.get(DUMP); + if ("true".equals(dump)) { + JsonArray arr = new JsonArray(); + arr.addAll(queue); + if (removeOnDump) { + queue.clear(); + } + String json = arr.toJson(); + sb.append(json).append("\n"); + return sb.toString(); + } + + String enabled = (String) options.get(ENABLED); + if ("false".equals(enabled)) { + // turn off all consumers + stopConsumers(); + this.enabled.set(false); + sb.append("Enabled: ").append("false").append("\n"); + return sb.toString(); + } + + String pattern = (String) options.get(ENDPOINT); + if (pattern != null) { + this.enabled.set(true); + Endpoint target = findMatchingEndpoint(getCamelContext(), pattern); + if (target != null) { + try { + Consumer consumer = createConsumer(getCamelContext(), target); + if (!consumers.contains(consumer)) { + consumers.add(consumer); + ServiceHelper.startService(consumer); + } + } catch (Exception e) { + // ignore + } + } + } + + sb.append("Enabled: ").append(this.enabled.get()).append("\n"); + sb.append("Total: ").append(this.uuid.get()).append("\n"); + for (Consumer c : consumers) { + sb.append(" ").append(c.getEndpoint().toString()).append("\n"); + } + return sb.toString(); + } + + private Consumer createConsumer(CamelContext camelContext, Endpoint target) throws Exception { + for (Consumer c : consumers) { + if (c.getEndpoint() == target) { + return c; + } + } + return target.createConsumer(this::addMessage); + } + + private void addMessage(Exchange exchange) { + JsonObject json + = MessageHelper.dumpAsJSonObject(exchange.getMessage(), false, false, true, true, true, true, bodyMaxChars); + json.put("uuid", uuid.incrementAndGet()); + + // ensure there is space on the queue by polling until at least single slot is free + int drain = queue.size() - capacity + 1; + if (drain > 0) { + for (int i = 0; i < drain; i++) { + queue.poll(); + } + } + queue.add(json); + } + + protected Endpoint findMatchingEndpoint(CamelContext camelContext, String endpoint) { + // TODO: find all processors that are endpoint aware and match pattern + + + Endpoint target = null; + // is the endpoint a pattern or route id + boolean scheme = endpoint.contains(":"); + boolean pattern = endpoint.endsWith("*"); + if (!scheme || pattern) { + if (!scheme) { + endpoint = endpoint + "*"; + } + for (Route route : camelContext.getRoutes()) { + // find last output + Endpoint e = route.getEndpoint(); + if (EndpointHelper.matchEndpoint(camelContext, e.getEndpointUri(), endpoint)) { + target = e; + break; + } + } + if (target == null) { + // okay it may refer to a route id + for (Route route : camelContext.getRoutes()) { + String id = route.getRouteId(); + Endpoint e = route.getEndpoint(); + if (EndpointHelper.matchEndpoint(camelContext, id, endpoint)) { + target = e; + break; + } + } + } + } else { + target = camelContext.getEndpoint(endpoint); + } + return target; + } + + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + String dump = (String) options.get(DUMP); + if ("true".equals(dump)) { + JsonArray arr = new JsonArray(); + arr.addAll(queue); + if (removeOnDump) { + queue.clear(); + } + root.put("messages", arr); + return root; + } + + String enabled = (String) options.get(ENABLED); + if ("false".equals(enabled)) { + // turn off all consumers + stopConsumers(); + this.enabled.set(false); + root.put("enabled", false); + return root; + } + + String pattern = (String) options.get(ENDPOINT); + if (pattern != null) { + this.enabled.set(true); + Endpoint target = findMatchingEndpoint(getCamelContext(), pattern); + if (target != null) { + try { + Consumer consumer = createConsumer(getCamelContext(), target); + if (!consumers.contains(consumer)) { + consumers.add(consumer); + ServiceHelper.startService(consumer); + } + } catch (Exception e) { + // ignore + } + } + } + + root.put("enabled", true); + root.put("total", uuid.get()); + JsonArray arr = new JsonArray(); + for (Consumer c : consumers) { + JsonObject jo = new JsonObject(); + jo.put("uri", c.getEndpoint().toString()); + arr.add(jo); + } + root.put("endpoints", arr); + return root; + } + +} diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index 4a74f68d444..e7d6d3620e5 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -114,8 +114,10 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C private File actionFile; private File outputFile; private File traceFile; + private long traceFilePos; // keep track of trace offset private File debugFile; - private long traceFilePos; // keep track of trace offset + private File receiveFile; + private long receiveFilePos; // keep track of receive offset private byte[] lastSource; private ExpressionDefinition lastSourceExpression; @@ -180,6 +182,7 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C outputFile = createLockFile(lockFile.getName() + "-output.json"); traceFile = createLockFile(lockFile.getName() + "-trace.json"); debugFile = createLockFile(lockFile.getName() + "-debug.json"); + receiveFile = createLockFile(lockFile.getName() + "-receive.json"); executor.scheduleWithFixedDelay(this::task, 0, delay, TimeUnit.MILLISECONDS); LOG.info("Camel JBang CLI enabled"); } else { @@ -223,7 +226,7 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C actionTask(); statusTask(); // only run this every 2nd time as gathering this data has more overhead - // and are only needed when doing tracing or debugging + // and are only needed when doing tracing/debugging/receive if (++counter % 2 == 0) { traceTask(); } @@ -280,6 +283,8 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C doActionTraceTask(root); } else if ("browse".equals(action)) { doActionBrowseTask(root); + } else if ("receive".equals(action)) { + doActionReceiveTask(root); } } catch (Exception e) { // ignore @@ -811,6 +816,24 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C } } + private void doActionReceiveTask(JsonObject root) throws IOException { + DevConsole dc = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) + .resolveById("receive"); + if (dc != null) { + JsonObject json; + String endpoint = root.getString("endpoint"); + if (endpoint != null) { + json = (JsonObject) dc.call(DevConsole.MediaType.JSON, Map.of("enabled", "true", "endpoint", endpoint)); + } else { + json = (JsonObject) dc.call(DevConsole.MediaType.JSON, Map.of("enabled", "false")); + } + LOG.trace("Updating output file: {}", outputFile); + IOHelper.writeText(json.toJson(), outputFile); + } else { + IOHelper.writeText("{}", outputFile); + } + } + private void doActionBeanTask(JsonObject root) throws IOException { String filter = root.getStringOrDefault("filter", ""); String properties = root.getStringOrDefault("properties", "true"); @@ -1206,6 +1229,33 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C LOG.trace("Error updating debug file: {} due to: {}. This exception is ignored.", debugFile, e.getMessage(), e); } + try { + DevConsole dc14 = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) + .resolveById("receive"); + if (dc14 != null) { + JsonObject json = (JsonObject) dc14.call(DevConsole.MediaType.JSON, Map.of("dump", "true")); + JsonArray arr = json.getCollection("messages"); + // filter based on last uid + if (receiveFilePos > 0) { + arr.removeIf(r -> { + JsonObject jo = (JsonObject) r; + return jo.getLong("uid") <= receiveFilePos; + }); + } + if (arr != null && !arr.isEmpty()) { + // store messages in a special file + LOG.trace("Updating receive file: {}", receiveFile); + String data = json.toJson() + System.lineSeparator(); + IOHelper.appendText(data, receiveFile); + json = arr.getMap(arr.size() - 1); + receiveFilePos = json.getLong("uid"); + } + } + } catch (Exception e) { + // ignore + LOG.trace("Error updating receive file: {} due to: {}. This exception is ignored.", + receiveFile, e.getMessage(), e); + } } private JsonObject collectMemory() { @@ -1331,6 +1381,9 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C if (debugFile != null) { FileUtil.deleteFile(debugFile); } + if (receiveFile != null) { + FileUtil.deleteFile(receiveFile); + } if (executor != null) { camelContext.getExecutorServiceManager().shutdown(executor); executor = null; diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java index 41ae9f33f67..7413250e008 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java @@ -110,6 +110,10 @@ public abstract class CamelCommand implements Callable<Integer> { return new File(CommandLineHelper.getCamelDir(), pid + "-trace.json"); } + public File getReceiveFile(String pid) { + return new File(CommandLineHelper.getCamelDir(), pid + "-receive.json"); + } + public File getDebugFile(String pid) { return new File(CommandLineHelper.getCamelDir(), pid + "-debug.json"); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java index 580118f5d60..9076f27f97d 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java @@ -212,16 +212,6 @@ public class CamelBrowseAction extends ActionBaseCommand { return 0; } - private static long getLongValueFromCollection(JsonArray arr, String key) { - for (Object o : arr) { - JsonObject jo = (JsonObject) o; - if (key.equalsIgnoreCase(jo.getString("key"))) { - return jo.getLong("value"); - } - } - return 0; - } - protected void dumpMessages(List<Row> rows, boolean onlyBody) { MessageTableHelper tableHelper = new MessageTableHelper(); tableHelper.setPretty(pretty); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java new file mode 100644 index 00000000000..b9f859fb4f2 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.action; + +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StopWatch; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.json.JsonObject; +import org.apache.camel.util.json.Jsoner; +import org.fusesource.jansi.Ansi; +import org.fusesource.jansi.AnsiConsole; +import picocli.CommandLine; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.LineNumberReader; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; + [email protected](name = "receive", + description = "Receive messages from endpoints", sortOptions = false) +public class CamelReceiveAction extends ActionBaseCommand { + + public static class ActionCompletionCandidates implements Iterable<String> { + + public ActionCompletionCandidates() { + } + + @Override + public Iterator<String> iterator() { + return List.of("dump", "start", "stop").iterator(); + } + } + + @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") + String name = "*"; + + @CommandLine.Option(names = { "--action" }, completionCandidates = ActionCompletionCandidates.class, + defaultValue = "dump", + description = "Action to start, stop, or dump messages") + String action; + + @CommandLine.Option(names = { "--endpoint" }, + description = "Endpoint where to receive the messages from (can be uri, pattern, or refer to a route id)") + String endpoint; + + @CommandLine.Option(names = { "--output-file" }, + description = "Saves messages received to the file with the given name (override if exists)") + String outputFile; + + @CommandLine.Option(names = { "--tail" }, defaultValue = "-1", + description = "The number of messages from the end of the receive to show. Use -1 to read from the beginning. Use 0 to read only new lines. Defaults to showing all messages from beginning.") + int tail = -1; + + @CommandLine.Option(names = { "--show-exchange-properties" }, defaultValue = "false", + description = "Show exchange properties in traced messages") + boolean showExchangeProperties; + + @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true", + description = "Show message headers in traced messages") + boolean showHeaders = true; + + @CommandLine.Option(names = { "--show-body" }, defaultValue = "true", + description = "Show message body in traced messages") + boolean showBody = true; + + @CommandLine.Option(names = { "--show-exception" }, defaultValue = "true", + description = "Show exception and stacktrace for failed messages") + boolean showException = true; + + @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", description = "Use colored logging") + boolean loggingColor = true; + + @CommandLine.Option(names = { "--pretty" }, + description = "Pretty print message body when using JSon or XML format") + boolean pretty; + + @CommandLine.Option(names = { "--follow" }, defaultValue = "true", + description = "Keep following and outputting new received messages (use ctrl + c to exit).") + boolean follow = true; + + private volatile long pid; + + private MessageTableHelper tableHelper; + + private static class Pid { + String pid; + String name; + Queue<String> fifo; + int depth; + LineNumberReader reader; + } + + public CamelReceiveAction(CamelJBangMain main) { + super(main); + } + + @Override + public Integer doCall() throws Exception { + if ("dump".equals(action)) { + return doDumpCall(); + } else { + return doStartStopCall(action); + } + } + + protected Integer doStartStopCall(String action) throws Exception { + // ensure output file is deleted before executing action + File outputFile = getOutputFile(Long.toString(pid)); + FileUtil.deleteFile(outputFile); + + JsonObject root = new JsonObject(); + root.put("action", "receive"); + root.put("endpoint", endpoint); + root.put("enabled", "start".equals(action) ? "true" : "false"); + + File f = getActionFile(Long.toString(pid)); + try { + IOHelper.writeText(root.toJson(), f); + } catch (Exception e) { + // ignore + } + waitForOutputFile(outputFile); + + // delete output file after use + FileUtil.deleteFile(outputFile); + + return 0; + } + + protected Integer doDumpCall() throws Exception { + List<Long> pids = findPids(name); + if (pids.isEmpty()) { + return 0; + } else if (pids.size() > 1) { + printer().println("Name or pid " + name + " matches " + pids.size() + + " running Camel integrations. Specify a name or PID that matches exactly one."); + return 0; + } + + this.pid = pids.get(0); + + boolean waitMessage = true; + StopWatch watch = new StopWatch(); + boolean more = true; + Pid pid = new Pid(); + pid.pid = "" + this.pid; + + + if (tail != 0) { + tailTraceFiles(pids, tail); + } else { + + } + + do { + waitMessage = true; + int lines = readReceiveFiles(pid); + if (lines > 0) { + more = dumpReceivedFiles(pids, 0, null); + } else if (lines == 0) { + Thread.sleep(100); + } else { + break; + } + } while (follow || more); + + return 0; + } + + private int readReceiveFiles(Pid pid) throws Exception { + int lines = 0; + if (pid.reader == null) { + File file = getReceiveFile(pid.pid); + if (file.exists()) { + pid.reader = new LineNumberReader(new FileReader(file)); + if (tail == 0) { + // only read new lines so forward to end of reader + long size = file.length(); + pid.reader.skip(size); + } + } + } + if (pid.reader != null) { + String line; + do { + try { + line = pid.reader.readLine(); + if (line != null) { + lines++; + // switch fifo to be unlimited as we use it for new traces + if (pid.fifo == null || pid.fifo instanceof ArrayBlockingQueue) { + pid.fifo = new ArrayDeque<>(); + } + pid.fifo.offer(line); + } + } catch (IOException e) { + // ignore + line = null; + } + } while (line != null); + } + + return lines; + } + + private boolean dumpReceivedFiles(Pid pid) { + List<Row> rows = new ArrayList<>(); + + Queue<String> queue = pid.fifo; + + + + + } + + private void tailTraceFiles(Map<Long, Pid> pids, int tail) throws Exception { + for (Pid pid : pids.values()) { + File file = getReceiveFile(pid.pid); + if (file.exists()) { + pid.reader = new LineNumberReader(new FileReader(file)); + String line; + if (tail <= 0) { + pid.fifo = new ArrayDeque<>(); + } else { + pid.fifo = new ArrayBlockingQueue<>(tail); + } + do { + line = pid.reader.readLine(); + if (line != null) { + while (!pid.fifo.offer(line)) { + pid.fifo.poll(); + } + } + } while (line != null); + } + } + } + + protected JsonObject waitForOutputFile(File outputFile) { + StopWatch watch = new StopWatch(); + while (watch.taken() < 5000) { + try { + // give time for response to be ready + Thread.sleep(20); + + if (outputFile.exists()) { + FileInputStream fis = new FileInputStream(outputFile); + String text = IOHelper.loadText(fis); + IOHelper.close(fis); + return (JsonObject) Jsoner.deserialize(text); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + // ignore + } + } + return null; + } + + private static class Row { + String pid; + long uid; + JsonObject message; + } + +}
