This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rcmd
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 47c1d35e2259c40a2dfda64a4fe3f8f17a509d4b
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Oct 10 12:16:12 2024 +0200

    CAMEL-21193: camel-jbang - Add listen command
---
 .../camel/impl/console/ReceiveDevConsole.java      | 198 ++++++++++-----------
 .../core/commands/action/CamelListenAction.java    |  36 +++-
 2 files changed, 127 insertions(+), 107 deletions(-)

diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
index c9a7e291cd6..79898f98aa4 100644
--- 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl.console;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -29,17 +30,13 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Channel;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.Navigate;
-import org.apache.camel.Processor;
 import org.apache.camel.spi.Configurer;
-import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.ExceptionHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.console.AbstractDevConsole;
@@ -47,6 +44,7 @@ import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.json.JsonArray;
 import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
 
 @DevConsole(name = "receive", displayName = "Camel Receive", description = 
"Consume messages from endpoints")
 @Configurer(bootstrap = true, extended = true)
@@ -78,7 +76,6 @@ public class ReceiveDevConsole extends AbstractDevConsole {
     public static final String ENDPOINT = "endpoint";
 
     private final List<Consumer> consumers = new ArrayList<>();
-
     private final AtomicBoolean enabled = new AtomicBoolean();
     private final AtomicLong uuid = new AtomicLong();
     private Queue<JsonObject> queue;
@@ -121,6 +118,11 @@ public class ReceiveDevConsole extends AbstractDevConsole {
         this.queue = new LinkedBlockingQueue<>(capacity);
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        stopConsumers();
+    }
+
     protected void stopConsumers() {
         for (Consumer c : consumers) {
             ServiceHelper.stopAndShutdownServices(c);
@@ -162,7 +164,7 @@ public class ReceiveDevConsole extends AbstractDevConsole {
             Endpoint target = findMatchingEndpoint(getCamelContext(), pattern);
             if (target != null) {
                 try {
-                    Consumer consumer = createConsumer(getCamelContext(), 
target);
+                    Consumer consumer = createConsumer(target);
                     if (!consumers.contains(consumer)) {
                         consumers.add(consumer);
                         ServiceHelper.startService(consumer);
@@ -181,7 +183,74 @@ public class ReceiveDevConsole extends AbstractDevConsole {
         return sb.toString();
     }
 
-    private Consumer createConsumer(CamelContext camelContext, Endpoint 
target) throws Exception {
+    protected JsonObject doCallJson(Map<String, Object> options) {
+        JsonObject root = new JsonObject();
+
+        String dump = (String) options.get(DUMP);
+        if ("true".equals(dump)) {
+            JsonArray arr = new JsonArray();
+            arr.addAll(queue);
+            if (removeOnDump) {
+                queue.clear();
+            }
+            root.put("messages", arr);
+            JsonObject jo = (JsonObject) arr.get(0);
+            firstTimestamp = jo.getLongOrDefault("timestamp", 0);
+            jo = (JsonObject) arr.get(arr.size() - 1);
+            lastTimestamp = jo.getLongOrDefault("timestamp", 0);
+            return root;
+        }
+
+        String enabled = (String) options.get(ENABLED);
+        if ("false".equals(enabled)) {
+            // turn off all consumers
+            stopConsumers();
+            this.enabled.set(false);
+            root.put("enabled", false);
+            return root;
+        }
+
+        String pattern = (String) options.get(ENDPOINT);
+        if (pattern != null) {
+            try {
+                Endpoint target = findMatchingEndpoint(getCamelContext(), 
pattern);
+                if (target != null) {
+                    root.put("url", target.getEndpointUri());
+                    Consumer consumer = createConsumer(target);
+                    if (!consumers.contains(consumer)) {
+                        consumers.add(consumer);
+                        ServiceHelper.startService(consumer);
+                    }
+                }
+                this.enabled.set(true);
+            } catch (Exception e) {
+                root.put("error", Jsoner.escape(e.getMessage()));
+                JsonArray arr2 = new JsonArray();
+                final String trace = ExceptionHelper.stackTraceToString(e);
+                root.put("stackTrace", arr2);
+                Collections.addAll(arr2, trace.split("\n"));
+            }
+        }
+
+        root.put("enabled", this.enabled.get());
+        root.put("total", uuid.get());
+        root.put("firstTimestamp", firstTimestamp);
+        root.put("lastTimestamp", lastTimestamp);
+
+        JsonArray arr = new JsonArray();
+        for (Consumer c : consumers) {
+            JsonObject jo = new JsonObject();
+            jo.put("uri", c.getEndpoint().toString());
+            jo.put("remote", c.getEndpoint().isRemote());
+            arr.add(jo);
+        }
+        if (!arr.isEmpty()) {
+            root.put("endpoints", arr);
+        }
+        return root;
+    }
+
+    private Consumer createConsumer(Endpoint target) throws Exception {
         for (Consumer c : consumers) {
             if (c.getEndpoint() == target) {
                 return c;
@@ -209,7 +278,7 @@ public class ReceiveDevConsole extends AbstractDevConsole {
         queue.add(json);
     }
 
-    protected Endpoint findMatchingEndpoint(CamelContext camelContext, String 
endpoint) {
+    protected static Endpoint findMatchingEndpoint(CamelContext camelContext, 
String endpoint) {
         Endpoint target = null;
         boolean scheme = endpoint.contains(":");
         boolean pattern = endpoint.endsWith("*");
@@ -217,23 +286,23 @@ public class ReceiveDevConsole extends AbstractDevConsole 
{
             if (!scheme && !endpoint.endsWith("*")) {
                 endpoint = endpoint + "*";
             }
-            MBeanServer mbeanServer = 
getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer();
+            // find all producers for this camel context via JMX mbeans (this 
allows to find also producers created via dynamic EIPs)
+            MBeanServer mbeanServer = 
camelContext.getManagementStrategy().getManagementAgent().getMBeanServer();
             if (mbeanServer != null) {
                 try {
-                    // find all producers for this camel context mbean
                     String jmxDomain
-                            = 
getCamelContext().getManagementStrategy().getManagementAgent().getMBeanObjectDomainName();
+                            = 
camelContext.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName();
                     String prefix
-                            = 
getCamelContext().getManagementStrategy().getManagementAgent().getIncludeHostName()
 ? "*/" : "";
+                            = 
camelContext.getManagementStrategy().getManagementAgent().getIncludeHostName() 
? "*/" : "";
                     ObjectName query = ObjectName.getInstance(
-                            jmxDomain + ":context=" + prefix + 
getCamelContext().getManagementName() + ",type=producers,*");
+                            jmxDomain + ":context=" + prefix + 
camelContext.getManagementName() + ",type=producers,*");
                     Set<ObjectName> set = mbeanServer.queryNames(query, null);
                     if (set != null && !set.isEmpty()) {
                         for (ObjectName on : set) {
                             String uri = (String) mbeanServer.getAttribute(on, 
"EndpointUri");
                             if (PatternHelper.matchPattern(uri, endpoint)) {
                                 // is the endpoint able to create a consumer
-                                target = getCamelContext().getEndpoint(uri);
+                                target = camelContext.getEndpoint(uri);
                                 // is the target able to create a consumer
                                 org.apache.camel.spi.UriEndpoint ann
                                         = 
ObjectHelper.getAnnotationDeep(target, org.apache.camel.spi.UriEndpoint.class);
@@ -259,100 +328,17 @@ public class ReceiveDevConsole extends 
AbstractDevConsole {
             }
         } else {
             target = camelContext.getEndpoint(endpoint);
-        }
-        return target;
-    }
-
-    protected JsonObject doCallJson(Map<String, Object> options) {
-        JsonObject root = new JsonObject();
-
-        String dump = (String) options.get(DUMP);
-        if ("true".equals(dump)) {
-            JsonArray arr = new JsonArray();
-            arr.addAll(queue);
-            if (removeOnDump) {
-                queue.clear();
-            }
-            root.put("messages", arr);
-            JsonObject jo = (JsonObject) arr.get(0);
-            firstTimestamp = jo.getLongOrDefault("timestamp", 0);
-            jo = (JsonObject) arr.get(arr.size() - 1);
-            lastTimestamp = jo.getLongOrDefault("timestamp", 0);
-            return root;
-        }
-
-        String enabled = (String) options.get(ENABLED);
-        if ("false".equals(enabled)) {
-            // turn off all consumers
-            stopConsumers();
-            this.enabled.set(false);
-            root.put("enabled", false);
-            return root;
-        }
-
-        String pattern = (String) options.get(ENDPOINT);
-        if (pattern != null) {
-            this.enabled.set(true);
-            Endpoint target = findMatchingEndpoint(getCamelContext(), pattern);
-            if (target != null) {
-                try {
-                    Consumer consumer = createConsumer(getCamelContext(), 
target);
-                    if (!consumers.contains(consumer)) {
-                        consumers.add(consumer);
-                        ServiceHelper.startService(consumer);
-                    }
-                } catch (Exception e) {
-                    // ignore
-                }
-            }
-        }
-
-        root.put("enabled", this.enabled.get());
-        root.put("total", uuid.get());
-        root.put("firstTimestamp", firstTimestamp);
-        root.put("lastTimestamp", lastTimestamp);
-
-        JsonArray arr = new JsonArray();
-        for (Consumer c : consumers) {
-            JsonObject jo = new JsonObject();
-            jo.put("uri", c.getEndpoint().toString());
-            jo.put("remote", c.getEndpoint().isRemote());
-            arr.add(jo);
-        }
-        root.put("endpoints", arr);
-        return root;
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        stopConsumers();
-    }
-
-    @SuppressWarnings("unchecked")
-    private void doFilter(String pattern, Navigate<Processor> nav, 
List<EndpointAware> match) {
-        List<Processor> list = nav.next();
-        if (list != null) {
-            for (Processor proc : list) {
-                if (proc instanceof Channel channel) {
-                    proc = channel.getNextProcessor();
-                }
-                if (proc instanceof EndpointAware ea) {
-                    String id = null;
-                    if (proc instanceof IdAware idAware) {
-                        id = idAware.getId();
-                    }
-                    String uri = ea.getEndpoint().getEndpointUri();
-                    if (PatternHelper.matchPattern(id, pattern) || 
PatternHelper.matchPattern(uri, pattern)) {
-                        match.add(ea);
-                    }
-                }
-                if (proc instanceof Navigate) {
-                    Navigate<Processor> child = (Navigate<Processor>) proc;
-                    doFilter(pattern, child, match);
+            // is the target able to create a consumer
+            org.apache.camel.spi.UriEndpoint ann
+                    = ObjectHelper.getAnnotationDeep(target, 
org.apache.camel.spi.UriEndpoint.class);
+            if (ann != null) {
+                if (ann.producerOnly()) {
+                    // skip if the endpoint cannot consume (we need to be able 
to consume to receive)
+                    throw new IllegalArgumentException("Cannot consume from 
endpoint: " + endpoint);
                 }
             }
         }
+        return target;
     }
 
 }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
index b09bb50809e..f392ea3d6a6 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java
@@ -43,8 +43,10 @@ import com.github.freva.asciitable.HorizontalAlign;
 import com.github.freva.asciitable.OverflowBehaviour;
 import org.apache.camel.catalog.impl.TimePatternConverter;
 import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.JSonHelper;
 import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
 import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
@@ -195,6 +197,10 @@ public class CamelListenAction extends ActionBaseCommand {
                     IOHelper.writeText("{}", f);
                 }
             } else {
+                // ensure output file is deleted before executing action
+                File outputFile = getOutputFile(Long.toString(pid));
+                FileUtil.deleteFile(outputFile);
+
                 JsonObject root = new JsonObject();
                 root.put("action", "receive");
                 if ("start".equals(action)) {
@@ -209,12 +215,40 @@ public class CamelListenAction extends ActionBaseCommand {
                 }
                 File f = getActionFile(Long.toString(pid));
                 IOHelper.writeText(root.toJson(), f);
+
+                JsonObject jo = waitForOutputFile(outputFile);
+                if (jo != null) {
+                    String error = jo.getString("error");
+                    if (error != null) {
+                        error = Jsoner.unescape(error);
+                        String url = jo.getString("url");
+                        List<String> stackTrace = 
jo.getCollection("stackTrace");
+                        if (url != null) {
+                            printer().println("Error starting listening on: " 
+ url + " due to: " + error);
+
+                        } else {
+                            printer().println("Error starting listening due 
to: " + error);
+                        }
+                        printer().println(StringHelper.fillChars('-', 120));
+                        printer().println(StringHelper.padString(1, 55) + 
"STACK-TRACE");
+                        printer().println(StringHelper.fillChars('-', 120));
+                        StringBuilder sb = new StringBuilder();
+                        for (int i = 0; i < stackTrace.size(); i++) {
+                            sb.append(String.format("\t%s%n", 
stackTrace.get(i)));
+                        }
+                        printer().println(String.valueOf(sb));
+                    }
+                }
             }
         }
 
         return 0;
     }
 
+    protected JsonObject waitForOutputFile(File outputFile) {
+        return getJsonObject(outputFile);
+    }
+
     protected Integer doStatusCall() {
         List<StatusRow> rows = new ArrayList<>();
 
@@ -270,7 +304,7 @@ public class CamelListenAction extends ActionBaseCommand {
                             .with(r -> r.name),
                     new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
                     new Column().header("STATUS").with(this::getStatus),
-                    new Column().header("TOTAL").with(r -> "" + r.counter),
+                    new Column().header("TOTAL").with(r -> r.enabled ? "" + 
r.counter : ""),
                     new 
Column().header("SINCE").headerAlign(HorizontalAlign.CENTER)
                             .with(this::getMessageAgo),
                     new 
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)

Reply via email to