This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f50459385bc CAMEL-21214: browsable endpoint - getBrowseStatus for
quick status without need for dump (#15559)
f50459385bc is described below
commit f50459385bc348630022fd57b9d6fbcba9b34af8
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Sep 13 17:57:37 2024 +0200
CAMEL-21214: browsable endpoint - getBrowseStatus for quick status without
need for dump (#15559)
---
.../camel/component/file/GenericFileConsumer.java | 26 ++++---
.../camel/component/file/GenericFileEndpoint.java | 41 ++++++++++
.../component/file/remote/RemoteFileConsumer.java | 6 +-
.../component/jms/DefaultQueueBrowseStrategy.java | 40 ++++++++++
.../camel/component/jms/JmsQueueEndpoint.java | 10 +++
.../camel/component/jms/QueueBrowseStrategy.java | 16 ++++
.../org/apache/camel/spi/BrowsableEndpoint.java | 27 +++++++
.../camel/impl/console/BrowseDevConsole.java | 89 +++++++++++++---------
.../ManagedBrowsableEndpointAsJSonFileTest.java | 2 +-
.../core/commands/action/CamelBrowseAction.java | 6 +-
10 files changed, 209 insertions(+), 54 deletions(-)
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 9774e748fdd..87d51223983 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -60,6 +60,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
private final Pattern excludePattern;
private final String[] includeExt;
private final String[] excludeExt;
+ private boolean retrieveFile = true;
protected GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor
processor, GenericFileOperations<T> operations,
GenericFileProcessStrategy<T>
processStrategy) {
@@ -429,7 +430,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
try {
if (isRetrieveFile()) {
- if (tryRetrievingFile(exchange, name, target,
absoluteFileName, file)) {
+ if (!tryRetrievingFile(exchange, name, target,
absoluteFileName, file)) {
return false;
}
} else {
@@ -474,7 +475,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
return true;
}
- private boolean tryRetrievingFile(
+ boolean tryRetrievingFile(
Exchange exchange, String name, GenericFile<T> target, String
absoluteFileName, GenericFile<T> file)
throws Exception {
// retrieve the file using the stream
@@ -496,7 +497,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
// remove file from the in progress list as we could not
// retrieve it, but should ignore
endpoint.getInProgressRepository().remove(absoluteFileName);
- return true;
+ return false;
} else {
// throw exception to handle the problem with retrieving
// the file
@@ -513,7 +514,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
}
LOG.trace("Retrieved file: {} from: {}", name, endpoint);
- return false;
+ return true;
}
/**
@@ -525,12 +526,19 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
protected abstract void updateFileHeaders(GenericFile<T> file, Message
message);
/**
- * Override if required. Files are retrieved / returns true by default
- *
- * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip
retrieval of files.
+ * Whether the consumer should retrieve/download files. If false then the
message body is null as no file is
+ * retrieved.
*/
- protected boolean isRetrieveFile() {
- return true;
+ public boolean isRetrieveFile() {
+ return retrieveFile;
+ }
+
+ /**
+ * Whether the consumer should retrieve/download files. If false then the
message body is null as no file is
+ * retrieved.
+ */
+ public void setRetrieveFile(boolean retrieveFile) {
+ this.retrieveFile = retrieveFile;
}
/**
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 9fe4a158fad..392f44d9a82 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -500,6 +500,22 @@ public abstract class GenericFileEndpoint<T> extends
ScheduledPollEndpoint imple
@Override
public List<Exchange> getExchanges(int limit, java.util.function.Predicate
filter) {
+ return getExchanges(limit, filter, false);
+ }
+
+ @Override
+ public BrowseStatus getBrowseStatus(int limit) {
+ List<Exchange> list = getExchanges(limit, null, true);
+ long ts = 0;
+ long ts2 = 0;
+ if (!list.isEmpty()) {
+ ts =
list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ ts2 = list.get(list.size() -
1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ }
+ return new BrowseStatus(list.size(), ts, ts2);
+ }
+
+ private List<Exchange> getExchanges(int limit,
java.util.function.Predicate filter, boolean status) {
final List<Exchange> answer = new ArrayList<>();
GenericFileConsumer<?> consumer = null;
@@ -510,6 +526,11 @@ public abstract class GenericFileEndpoint<T> extends
ScheduledPollEndpoint imple
if (filter == null) {
consumer.setMaxMessagesPerPoll(browseLimit);
}
+ if (status) {
+ // optimize to not download files as we only want status
+ consumer.setRetrieveFile(false);
+ }
+ final GenericFileConsumer gfc = consumer;
consumer.setCustomProcessor(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
@@ -518,6 +539,26 @@ public abstract class GenericFileEndpoint<T> extends
ScheduledPollEndpoint imple
include = filter.test(exchange);
}
if (include && answer.size() < browseLimit) {
+ if (!status) {
+ // ensure payload is downloaded (when not in
status mode)
+ GenericFile<?> gf =
exchange.getMessage().getBody(GenericFile.class);
+ if (gf != null) {
+ final String name = gf.getAbsoluteFilePath();
+ try {
+ boolean downloaded =
gfc.tryRetrievingFile(exchange, name, gf, name, gf);
+ if (downloaded) {
+ gf.getBinding().loadContent(exchange,
gf);
+ Object data = gf.getBody();
+ if (data != null) {
+
exchange.getMessage().setBody(data);
+ }
+ }
+ } catch (Exception e) {
+ LOG.debug("Error trying to retrieve file:
{} due to: {}. This exception is ignored.", name,
+ e.getMessage(), e);
+ }
+ }
+ }
answer.add(exchange);
}
}
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 62dc1726b21..0b268ae4ee3 100644
---
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -45,6 +45,7 @@ public abstract class RemoteFileConsumer<T> extends
GenericFileConsumer<T> {
GenericFileProcessStrategy processStrategy) {
super(endpoint, processor, operations, processStrategy);
this.setPollStrategy(new RemoteFilePollingConsumerPollStrategy());
+ this.setRetrieveFile(endpoint.isDownload());
}
@Override
@@ -153,11 +154,6 @@ public abstract class RemoteFileConsumer<T> extends
GenericFileConsumer<T> {
return super.processExchange(exchange);
}
- @Override
- protected boolean isRetrieveFile() {
- return getEndpoint().isDownload();
- }
-
/**
* Whether there is a starting directory configured.
*/
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
index 0d3073958d5..ce24c49a2c5 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
@@ -26,6 +26,7 @@ import jakarta.jms.QueueBrowser;
import jakarta.jms.Session;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.BrowsableEndpoint;
import org.springframework.jms.core.BrowserCallback;
import org.springframework.jms.core.JmsOperations;
@@ -46,6 +47,17 @@ public class DefaultQueueBrowseStrategy implements
QueueBrowseStrategy {
}
}
+ @Override
+ public BrowsableEndpoint.BrowseStatus browseStatus(
+ JmsOperations template, String queue, JmsBrowsableEndpoint
endpoint, int limit) {
+ if (endpoint.getSelector() != null) {
+ return template.browseSelected(queue, endpoint.getSelector(),
+ (session, browser) -> doBrowseStatus(endpoint, session,
browser, limit));
+ } else {
+ return template.browse(queue, (session, browser) ->
doBrowseStatus(endpoint, session, browser, limit));
+ }
+ }
+
private static List<Exchange> doBrowse(JmsBrowsableEndpoint endpoint,
Session session, QueueBrowser browser, int limit)
throws JMSException {
@@ -66,4 +78,32 @@ public class DefaultQueueBrowseStrategy implements
QueueBrowseStrategy {
return answer;
}
+ private static BrowsableEndpoint.BrowseStatus doBrowseStatus(
+ JmsBrowsableEndpoint endpoint, Session session, QueueBrowser
browser, int limit)
+ throws JMSException {
+ if (limit <= 0) {
+ limit = Integer.MAX_VALUE;
+ }
+
+ // not the best implementation in the world as we have to browse
+ // the entire queue, which could be massive
+ Enumeration<?> iter = browser.getEnumeration();
+
+ int size = 0;
+ long ts1 = 0;
+ long ts2 = 0;
+ Message message = null;
+ for (int i = 0; i < limit && iter.hasMoreElements(); i++) {
+ message = (Message) iter.nextElement();
+ if (i == 0) {
+ ts1 = message.getJMSTimestamp();
+ }
+ size++;
+ }
+ if (message != null && size > 0) {
+ ts2 = message.getJMSTimestamp();
+ }
+ return new BrowsableEndpoint.BrowseStatus(size, ts1, ts2);
+ }
+
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
index 8381b60f16e..dd64a343ac4 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
@@ -96,6 +96,16 @@ public class JmsQueueEndpoint extends JmsEndpoint implements
JmsBrowsableEndpoin
return getExchanges(maximumBrowseSize, null);
}
+ @Override
+ public BrowseStatus getBrowseStatus(int limit) {
+ if (queueBrowseStrategy == null) {
+ return new BrowseStatus(0, 0, 0);
+ }
+ String queue = getDestinationName();
+ JmsOperations template = getConfiguration().createInOnlyTemplate(this,
false, queue);
+ return queueBrowseStrategy.browseStatus(template, queue, this, limit);
+ }
+
@Override
public List<Exchange> getExchanges(int limit, Predicate filter) {
if (queueBrowseStrategy == null) {
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java
index 0a73d2f0c0f..9daa12b2ab7 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.jms;
import java.util.List;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.BrowsableEndpoint;
import org.springframework.jms.core.JmsOperations;
/**
@@ -31,4 +32,19 @@ public interface QueueBrowseStrategy {
*/
List<Exchange> browse(JmsOperations template, String queue,
JmsBrowsableEndpoint endpoint, int limit);
+ /**
+ * Browse quick status of the given queue
+ */
+ default BrowsableEndpoint.BrowseStatus browseStatus(
+ JmsOperations template, String queue, JmsBrowsableEndpoint
endpoint, int limit) {
+ List<Exchange> list = browse(template, queue, endpoint, limit);
+ long ts = 0;
+ long ts2 = 0;
+ if (!list.isEmpty()) {
+ ts =
list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ ts2 = list.get(list.size() -
1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ }
+ return new BrowsableEndpoint.BrowseStatus(list.size(), ts, ts2);
+ }
+
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
index b85ce619854..8292aae4c25 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
@@ -29,6 +29,16 @@ import org.apache.camel.Exchange;
*/
public interface BrowsableEndpoint extends Endpoint {
+ /**
+ * A quick status of the browse queue
+ *
+ * @param size number of messages in the queue
+ * @param firstTimestamp timestamp of first message (0 if no information)
+ * @param lastTimestamp timestamp of last message (0 if no information)
+ */
+ record BrowseStatus(int size, long firstTimestamp, long lastTimestamp) {
+ }
+
/**
* Maximum number of messages to browse by default.
*/
@@ -39,6 +49,23 @@ public interface BrowsableEndpoint extends Endpoint {
*/
void setBrowseLimit(int browseLimit);
+ /**
+ * Returns a quick browse status
+ *
+ * @param limit to limit the result to a maximum. Use 0 for default limit.
+ * @return the status
+ */
+ default BrowseStatus getBrowseStatus(int limit) {
+ List<Exchange> list = getExchanges();
+ long ts = 0;
+ long ts2 = 0;
+ if (!list.isEmpty()) {
+ ts =
list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ ts2 = list.get(list.size() -
1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ }
+ return new BrowseStatus(list.size(), ts, ts2);
+ }
+
/**
* Return the exchanges available on this endpoint
*
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java
index 59d4880a846..b7e089eca63 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java
@@ -118,18 +118,19 @@ public class BrowseDevConsole extends AbstractDevConsole {
for (Endpoint endpoint : endpoints) {
if (endpoint instanceof BrowsableEndpoint be
&& (filter == null ||
PatternHelper.matchPattern(endpoint.getEndpointUri(), filter))) {
- List<Exchange> list = freshSize ?
be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null);
- int queueSize = list != null ? list.size() : 0;
- int begin = 0;
- if (list != null && pos > 0) {
- begin = Math.max(0, list.size() - pos);
- list = list.subList(begin, list.size());
- }
- if (list != null) {
- sb.append("\n");
- sb.append(String.format("Browse: %s (size: %d limit: %d
position: %d)%n", endpoint.getEndpointUri(),
- queueSize, max, begin));
- if (dump) {
+
+ if (dump) {
+ List<Exchange> list = freshSize ?
be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null);
+ int queueSize = list != null ? list.size() : 0;
+ int begin = 0;
+ if (list != null && pos > 0) {
+ begin = Math.max(0, list.size() - pos);
+ list = list.subList(begin, list.size());
+ }
+ if (list != null) {
+ sb.append("\n");
+ sb.append(String.format("Browse: %s (size: %d limit:
%d position: %d)%n", endpoint.getEndpointUri(),
+ queueSize, max, begin));
for (Exchange e : list) {
String json
= MessageHelper.dumpAsJSon(e.getMessage(),
false, false, includeBody, 2, true, true, true,
@@ -138,6 +139,9 @@ public class BrowseDevConsole extends AbstractDevConsole {
sb.append("\n");
}
}
+ } else {
+ BrowsableEndpoint.BrowseStatus status =
be.getBrowseStatus(Integer.MAX_VALUE);
+ sb.append(String.format("Browse: %s (size: %d%n",
endpoint.getEndpointUri(), status.size()));
}
}
}
@@ -166,33 +170,34 @@ public class BrowseDevConsole extends AbstractDevConsole {
for (Endpoint endpoint : endpoints) {
if (endpoint instanceof BrowsableEndpoint be
&& (filter == null ||
PatternHelper.matchPattern(endpoint.getEndpointUri(), filter))) {
- List<Exchange> list = freshSize ?
be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null);
- int queueSize = list != null ? list.size() : 0;
- int begin = 0;
- if (list != null && pos > 0) {
- begin = Math.max(0, list.size() - pos);
- list = list.subList(begin, list.size());
- }
- if (list != null) {
- JsonObject jo = new JsonObject();
- jo.put("endpointUri", endpoint.getEndpointUri());
- jo.put("queueSize", queueSize);
- jo.put("limit", max);
- jo.put("position", begin);
- if (!list.isEmpty()) {
- long ts =
list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
- if (ts > 0) {
- jo.put("firstTimestamp", ts);
- }
- if (list.size() > 1) {
- ts = list.get(list.size() -
1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
+ if (dump) {
+ List<Exchange> list = freshSize ?
be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null);
+ int queueSize = list != null ? list.size() : 0;
+ int begin = 0;
+ if (list != null && pos > 0) {
+ begin = Math.max(0, list.size() - pos);
+ list = list.subList(begin, list.size());
+ }
+ if (list != null) {
+ JsonObject jo = new JsonObject();
+ jo.put("endpointUri", endpoint.getEndpointUri());
+ jo.put("queueSize", queueSize);
+ jo.put("limit", max);
+ jo.put("position", begin);
+ if (!list.isEmpty()) {
+ long ts =
list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class);
if (ts > 0) {
- jo.put("lastTimestamp", ts);
+ jo.put("firstTimestamp", ts);
+ }
+ if (list.size() > 1) {
+ ts = list.get(list.size() -
1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0,
+ long.class);
+ if (ts > 0) {
+ jo.put("lastTimestamp", ts);
+ }
}
}
- }
- arr.add(jo);
- if (dump) {
+ arr.add(jo);
JsonArray arr2 = new JsonArray();
for (Exchange e : list) {
arr2.add(MessageHelper.dumpAsJSonObject(e.getMessage(), false, false,
includeBody, true, true, true,
@@ -202,6 +207,18 @@ public class BrowseDevConsole extends AbstractDevConsole {
jo.put("messages", arr2);
}
}
+ } else {
+ BrowsableEndpoint.BrowseStatus status =
be.getBrowseStatus(Integer.MAX_VALUE);
+ JsonObject jo = new JsonObject();
+ jo.put("endpointUri", endpoint.getEndpointUri());
+ jo.put("queueSize", status.size());
+ if (status.firstTimestamp() > 0) {
+ jo.put("firstTimestamp", status.firstTimestamp());
+ }
+ if (status.lastTimestamp() > 0) {
+ jo.put("lastTimestamp", status.lastTimestamp());
+ }
+ arr.add(jo);
}
}
}
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java
index 4abcdcff063..79f2f20b935 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java
@@ -45,7 +45,7 @@ public class ManagedBrowsableEndpointAsJSonFileTest extends
ManagementTestSuppor
assertNotNull(out);
log.info(out);
assertTrue(out.contains("\"value\": \"Hello World\""));
- assertTrue(out.contains("\"type\":
\"org.apache.camel.component.file.GenericFile\""));
+ assertTrue(out.contains("\"type\": \"java.lang.String\""));
assertTrue(out.contains("\"value\": \"hello.txt\""));
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
index 4437109ad83..580118f5d60 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
@@ -184,8 +184,8 @@ public class CamelBrowseAction extends ActionBaseCommand {
row.uri = URISupport.sanitizeUri(row.uri);
}
row.queueSize = o.getInteger("queueSize");
- row.limit = o.getInteger("limit");
- row.position = o.getInteger("position");
+ row.limit = o.getIntegerOrDefault("limit", 0);
+ row.position = o.getIntegerOrDefault("position", 0);
row.firstTimestamp = o.getLongOrDefault("firstTimestamp",
0);
row.lastTimestamp = o.getLongOrDefault("lastTimestamp", 0);
if (dump) {
@@ -315,7 +315,7 @@ public class CamelBrowseAction extends ActionBaseCommand {
if (freshSize) {
return "" + r.queueSize;
}
- if (r.queueSize >= r.limit) {
+ if (r.limit > 0 && r.queueSize >= r.limit) {
return r.queueSize + "+";
}
return "" + r.queueSize;