This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e5f15cb6f15 CAMEL-21136: Added --poll to poll messages via jbang and 
/q/send HTTP server
e5f15cb6f15 is described below

commit e5f15cb6f150bc30de73eb46443c21a7f3790e74
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Sep 5 14:36:01 2024 +0200

    CAMEL-21136: Added --poll to poll messages via jbang and /q/send HTTP server
---
 .../platform/http/main/MainHttpServer.java         | 47 ++++++++++++++++------
 .../modules/ROOT/pages/camel-jbang.adoc            | 18 +++++++++
 .../camel/cli/connector/LocalCliConnector.java     | 42 +++++++++++++------
 .../core/commands/action/CamelSendAction.java      | 38 ++++++++++++++---
 4 files changed, 115 insertions(+), 30 deletions(-)

diff --git 
a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
 
b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
index fcf9ac76969..5a57c98a440 100644
--- 
a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
+++ 
b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
@@ -48,6 +48,7 @@ import io.vertx.ext.web.handler.BodyHandler;
 import io.vertx.ext.web.impl.BlockingHandlerDecorator;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -105,9 +106,11 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MainHttpServer.class);
     private static final int BODY_MAX_CHARS = 128 * 1024;
+    private static final int DEFAULT_POLL_TIMEOUT = 20000;
 
     private final HeaderFilterStrategy filter = new 
HttpProtocolHeaderFilterStrategy();
     private ProducerTemplate producer;
+    private ConsumerTemplate consumer;
 
     private VertxPlatformHttpServer server;
     private VertxPlatformHttpRouter router;
@@ -357,6 +360,9 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
         if (sendEnabled && producer == null) {
             producer = camelContext.createProducerTemplate();
         }
+        if (sendEnabled && consumer == null) {
+            consumer = camelContext.createConsumerTemplate();
+        }
 
         server = new VertxPlatformHttpServer(configuration);
         // adding server to camel-context which will manage shutdown the 
server, so we should not do this here
@@ -368,14 +374,14 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
             pluginRegistry.setCamelContext(getCamelContext());
             
getCamelContext().getCamelContextExtension().addContextPlugin(PlatformHttpPluginRegistry.class,
 pluginRegistry);
         }
-        ServiceHelper.initService(pluginRegistry, producer);
+        ServiceHelper.initService(pluginRegistry, producer, consumer);
     }
 
     @Override
     protected void doStart() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext");
 
-        ServiceHelper.startService(server, pluginRegistry, producer);
+        ServiceHelper.startService(server, pluginRegistry, producer, consumer);
         router = VertxPlatformHttpRouter.lookup(camelContext);
         platformHttpComponent = camelContext.getComponent("platform-http", 
PlatformHttpComponent.class);
 
@@ -385,7 +391,7 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(pluginRegistry, producer);
+        ServiceHelper.stopAndShutdownServices(pluginRegistry, producer, 
consumer);
     }
 
     private boolean pluginsEnabled() {
@@ -1235,11 +1241,14 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
         String endpoint = ctx.request().getHeader("endpoint");
         String exchangePattern = ctx.request().getHeader("exchangePattern");
         String resultType = ctx.request().getHeader("resultType");
+        String poll = ctx.request().getHeader("poll");
+        String pollTimeout = ctx.request().getHeader("pollTimeout");
         final Map<String, Object> headers = new LinkedHashMap<>();
         for (var entry : ctx.request().headers()) {
             String k = entry.getKey();
             boolean exclude
-                    = "endpoint".equals(k) || "exchangePattern".equals(k) || 
"resultType".equals(k) || "Accept".equals(k)
+                    = "endpoint".equals(k) || "exchangePattern".equals(k) || 
"poll".equals(k)
+                            || "pollTimeout".equals(k) || 
"resultType".equals(k) || "Accept".equals(k)
                             || 
filter.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), null);
             if (!exclude) {
                 headers.put(entry.getKey(), entry.getValue());
@@ -1301,18 +1310,24 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
                     exchangePattern = "InOnly"; // use in-only by default
                 }
                 final ExchangePattern mep = 
ExchangePattern.valueOf(exchangePattern);
-                out = producer.send(target, exchange -> {
-                    exchange.setPattern(mep);
-                    exchange.getMessage().setBody(body);
-                    if (!headers.isEmpty()) {
-                        exchange.getMessage().setHeaders(headers);
-                    }
-                });
-                if (clazz != null) {
+                long timeout = pollTimeout != null ? 
Long.parseLong(pollTimeout) : DEFAULT_POLL_TIMEOUT;
+                if ("true".equals(poll)) {
+                    exchangePattern = "InOut"; // we want to receive the data 
so enable out mode
+                    out = consumer.receive(target, timeout);
+                } else {
+                    out = producer.send(target, exchange -> {
+                        exchange.setPattern(mep);
+                        exchange.getMessage().setBody(body);
+                        if (!headers.isEmpty()) {
+                            exchange.getMessage().setHeaders(headers);
+                        }
+                    });
+                }
+                if (clazz != null && out != null) {
                     Object b = out.getMessage().getBody(clazz);
                     out.getMessage().setBody(b);
                 }
-            } catch (ClassNotFoundException e) {
+            } catch (Exception e) {
                 jo.put("endpoint", target.getEndpointUri());
                 jo.put("exchangePattern", exchangePattern);
                 jo.put("timestamp", timestamp);
@@ -1353,6 +1368,12 @@ public class MainHttpServer extends ServiceSupport 
implements CamelContextAware,
                 jo.put("timestamp", timestamp);
                 jo.put("elapsed", watch.taken());
                 jo.put("status", "success");
+            } else {
+                // timeout as there is no data
+                jo.put("endpoint", target.getEndpointUri());
+                jo.put("timestamp", timestamp);
+                jo.put("elapsed", watch.taken());
+                jo.put("status", "timeout");
             }
         } else {
             // there is no valid endpoint
diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
index 1a201c620f6..90f8e0fb1f8 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
@@ -1382,6 +1382,24 @@ TIP: See more options with `camel cmd send --help`.
 
 The source for this example is provided on GitHub at 
https://github.com/apache/camel-kamelets-examples/tree/main/jbang/mqtt)[camel-jbang
 MQTT example].
 
+==== Poll messages via Camel
+
+*Available since Camel 4.8*
+
+The `camel cmd send` command has been improved to also _poll_ messages from 
Camel. This is needed
+if you want to poll the latest messages from a Kafka topic, JMS queue, or 
download a file from FTP etc.
+
+The poll uses Camel consumer to poll the message (timeout if no message 
received) instead of producer.
+
+For example to poll a message from a ActiveMQ queue named cheese you can do:
+
+[source,bash]
+----
+$ camel cmd send --poll --endpoint=activemq:cheese
+----
+
+When you poll then you do not send any payload (body or headers).
+
 === Controlling local Camel integrations
 
 To list the currently running Camel integrations you use the `ps` command:
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 203520a614d..7ad6ecff054 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -43,12 +43,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.NoSuchEndpointException;
-import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.Route;
 import org.apache.camel.RoutesBuilder;
@@ -108,6 +108,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
     private ScheduledExecutorService executor;
     private volatile ExecutorService terminateExecutor;
     private ProducerTemplate producer;
+    private ConsumerTemplate consumer;
     private File lockFile;
     private File statusFile;
     private File actionFile;
@@ -163,6 +164,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         }
         platformVersion = cliConnectorFactory.getRuntimeVersion();
         producer = camelContext.createProducerTemplate();
+        consumer = camelContext.createConsumerTemplate();
 
         // create thread from JDK so it is not managed by Camel because we 
want the pool to be independent when
         // camel is being stopped which otherwise can lead to stopping the 
thread pool while the task is running
@@ -500,8 +502,12 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         StopWatch watch = new StopWatch();
         long timestamp = System.currentTimeMillis();
         String endpoint = root.getString("endpoint");
-        String body = root.getString("body");
+        String body = root.getStringOrDefault("body", "");
         String exchangePattern = root.getString("exchangePattern");
+        boolean poll = root.getBooleanOrDefault("poll", false);
+        long pollTimeout = root.getLongOrDefault("pollTimeout", 20000L);
+        // give extra time as jbang need to have response
+        pollTimeout += 5000;
         Collection<JsonObject> headers = root.getCollection("headers");
         if (body != null) {
             InputStream is = null;
@@ -560,20 +566,23 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             }
 
             if (target != null) {
-                out = producer.send(target, new Processor() {
-                    @Override
-                    public void process(Exchange exchange) throws Exception {
+                if (poll) {
+                    exchangePattern = "InOut";
+                    out = consumer.receive(target, pollTimeout);
+                } else {
+                    final String mep = exchangePattern;
+                    out = producer.send(target, exchange -> {
                         exchange.getMessage().setBody(inputBody);
                         if (inputHeaders != null) {
                             exchange.getMessage().setHeaders(inputHeaders);
                         }
                         exchange.setPattern(
-                                "InOut".equals(exchangePattern) ? 
ExchangePattern.InOut : ExchangePattern.InOnly);
-                    }
-                });
+                                "InOut".equals(mep) ? ExchangePattern.InOut : 
ExchangePattern.InOnly);
+                    });
+                }
                 IOHelper.close(is);
                 LOG.trace("Updating output file: {}", outputFile);
-                if (out.getException() != null) {
+                if (out != null && out.getException() != null) {
                     JsonObject jo = new JsonObject();
                     jo.put("endpoint", target.getEndpointUri());
                     jo.put("exchangeId", out.getExchangeId());
@@ -585,7 +594,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                     jo.put("exception",
                             
MessageHelper.dumpExceptionAsJSonObject(out.getException()).getMap("exception"));
                     IOHelper.writeText(jo.toJson(), outputFile);
-                } else if ("InOut".equals(exchangePattern)) {
+                } else if (out != null && "InOut".equals(exchangePattern)) {
                     JsonObject jo = new JsonObject();
                     jo.put("endpoint", target.getEndpointUri());
                     jo.put("exchangeId", out.getExchangeId());
@@ -597,7 +606,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                     jo.put("message", 
MessageHelper.dumpAsJSonObject(out.getMessage(), true, true, true, true, true, 
true,
                             BODY_MAX_CHARS).getMap("message"));
                     IOHelper.writeText(jo.toJson(), outputFile);
-                } else {
+                } else if (out != null) {
                     JsonObject jo = new JsonObject();
                     jo.put("endpoint", target.getEndpointUri());
                     jo.put("exchangeId", out.getExchangeId());
@@ -606,6 +615,15 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                     jo.put("elapsed", watch.taken());
                     jo.put("status", "success");
                     IOHelper.writeText(jo.toJson(), outputFile);
+                } else {
+                    JsonObject jo = new JsonObject();
+                    jo.put("endpoint", target.getEndpointUri());
+                    jo.put("exchangeId", "");
+                    jo.put("exchangePattern", exchangePattern);
+                    jo.put("timestamp", timestamp);
+                    jo.put("elapsed", watch.taken());
+                    jo.put("status", "timeout");
+                    IOHelper.writeText(jo.toJson(), outputFile);
                 }
             } else {
                 // there is no valid endpoint
@@ -1267,7 +1285,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             camelContext.getExecutorServiceManager().shutdown(executor);
             executor = null;
         }
-        ServiceHelper.stopService(producer);
+        ServiceHelper.stopService(producer, consumer);
     }
 
     private static String getPid() {
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
index dfdb67dfc79..e9578d82661 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
@@ -46,6 +46,10 @@ public class CamelSendAction extends ActionBaseCommand {
                         description = "Endpoint where to send the message (can 
be uri, pattern, or refer to a route id)")
     String endpoint;
 
+    @CommandLine.Option(names = { "--poll" },
+                        description = "Poll instead of sending a message. This 
can be used to receive latest message from a Kafka topic or JMS queue.")
+    boolean poll;
+
     @CommandLine.Option(names = { "--reply" },
                         description = "Whether to expect a reply message 
(InOut vs InOut messaging style)")
     boolean reply;
@@ -54,7 +58,7 @@ public class CamelSendAction extends ActionBaseCommand {
                         description = "Saves reply message to the file with 
the given name (override if exists)")
     String replyFile;
 
-    @CommandLine.Option(names = { "--body" }, required = true,
+    @CommandLine.Option(names = { "--body" },
                         description = "Message body to send (prefix with file: 
to refer to loading message body from file)")
     String body;
 
@@ -117,9 +121,17 @@ public class CamelSendAction extends ActionBaseCommand {
         JsonObject root = new JsonObject();
         root.put("action", "send");
         root.put("endpoint", endpoint);
+        root.put("poll", poll);
+        // timeout cannot be too low
+        if (timeout < 5000) {
+            timeout = 5000;
+        }
+        root.put("pollTimeout", Math.min(1000, timeout - 1000)); // poll 
timeout should be shorter than jbang timeout
         String mep = (reply || replyFile != null) ? "InOut" : "InOnly";
         root.put("exchangePattern", mep);
-        root.put("body", body);
+        if (body != null) {
+            root.put("body", body);
+        }
         if (headers != null) {
             JsonArray arr = new JsonArray();
             for (String h : headers) {
@@ -230,19 +242,35 @@ public class CamelSendAction extends ActionBaseCommand {
 
     private String getStatus(JsonObject r) {
         boolean failed = "failed".equals(r.getString("status"));
+        boolean timeout = "timeout".equals(r.getString("status"));
         boolean reply = r.containsKey("message");
         String status;
+        Ansi.Color c = Ansi.Color.GREEN;
         if (failed) {
             status = "Failed (exception)";
+            c = Ansi.Color.RED;
         } else if (replyFile != null) {
-            status = "Reply saved to file (success)";
+            if (poll) {
+                status = "Poll save to fill (success)";
+            } else {
+                status = "Reply save to file (success)";
+            }
         } else if (reply) {
-            status = "Reply received (success)";
+            if (poll) {
+                status = "Poll received (success)";
+            } else {
+                status = "Reply received (success)";
+            }
+        } else if (timeout) {
+            status = "Timeout";
+            c = Ansi.Color.YELLOW;
+        } else if (poll) {
+            status = "Poll (success)";
         } else {
             status = "Sent (success)";
         }
         if (loggingColor) {
-            return Ansi.ansi().fg(failed ? Ansi.Color.RED : 
Ansi.Color.GREEN).a(status).reset().toString();
+            return Ansi.ansi().fg(c).a(status).reset().toString();
         } else {
             return status;
         }

Reply via email to