This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch browse-status in repository https://gitbox.apache.org/repos/asf/camel.git
commit 354e952e1b44fbbdbb68f8f36b1c7f0555902cf0 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Sep 13 17:29:25 2024 +0200 CAMEL-21214: browsable endpoint - getBrowseStatus for quick status without need for dump --- .../camel/component/file/GenericFileConsumer.java | 26 ++++--- .../camel/component/file/GenericFileEndpoint.java | 41 ++++++++++ .../component/file/remote/RemoteFileConsumer.java | 6 +- .../component/jms/DefaultQueueBrowseStrategy.java | 40 ++++++++++ .../camel/component/jms/JmsQueueEndpoint.java | 10 +++ .../camel/component/jms/QueueBrowseStrategy.java | 16 ++++ .../org/apache/camel/spi/BrowsableEndpoint.java | 27 +++++++ .../camel/impl/console/BrowseDevConsole.java | 89 +++++++++++++--------- .../ManagedBrowsableEndpointAsJSonFileTest.java | 2 +- .../core/commands/action/CamelBrowseAction.java | 6 +- 10 files changed, 209 insertions(+), 54 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 9774e748fdd..87d51223983 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -60,6 +60,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum private final Pattern excludePattern; private final String[] includeExt; private final String[] excludeExt; + private boolean retrieveFile = true; protected GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) { @@ -429,7 +430,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum try { if (isRetrieveFile()) { - if (tryRetrievingFile(exchange, name, target, absoluteFileName, file)) { + if (!tryRetrievingFile(exchange, name, target, absoluteFileName, file)) { return false; } } else { @@ -474,7 +475,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return true; } - private boolean tryRetrievingFile( + boolean tryRetrievingFile( Exchange exchange, String name, GenericFile<T> target, String absoluteFileName, GenericFile<T> file) throws Exception { // retrieve the file using the stream @@ -496,7 +497,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // remove file from the in progress list as we could not // retrieve it, but should ignore endpoint.getInProgressRepository().remove(absoluteFileName); - return true; + return false; } else { // throw exception to handle the problem with retrieving // the file @@ -513,7 +514,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum } LOG.trace("Retrieved file: {} from: {}", name, endpoint); - return false; + return true; } /** @@ -525,12 +526,19 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum protected abstract void updateFileHeaders(GenericFile<T> file, Message message); /** - * Override if required. Files are retrieved / returns true by default - * - * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. + * Whether the consumer should retrieve/download files. If false then the message body is null as no file is + * retrieved. */ - protected boolean isRetrieveFile() { - return true; + public boolean isRetrieveFile() { + return retrieveFile; + } + + /** + * Whether the consumer should retrieve/download files. If false then the message body is null as no file is + * retrieved. + */ + public void setRetrieveFile(boolean retrieveFile) { + this.retrieveFile = retrieveFile; } /** diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java index 9fe4a158fad..392f44d9a82 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java @@ -500,6 +500,22 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple @Override public List<Exchange> getExchanges(int limit, java.util.function.Predicate filter) { + return getExchanges(limit, filter, false); + } + + @Override + public BrowseStatus getBrowseStatus(int limit) { + List<Exchange> list = getExchanges(limit, null, true); + long ts = 0; + long ts2 = 0; + if (!list.isEmpty()) { + ts = list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + ts2 = list.get(list.size() - 1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + } + return new BrowseStatus(list.size(), ts, ts2); + } + + private List<Exchange> getExchanges(int limit, java.util.function.Predicate filter, boolean status) { final List<Exchange> answer = new ArrayList<>(); GenericFileConsumer<?> consumer = null; @@ -510,6 +526,11 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple if (filter == null) { consumer.setMaxMessagesPerPoll(browseLimit); } + if (status) { + // optimize to not download files as we only want status + consumer.setRetrieveFile(false); + } + final GenericFileConsumer gfc = consumer; consumer.setCustomProcessor(new Processor() { @Override public void process(Exchange exchange) throws Exception { @@ -518,6 +539,26 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple include = filter.test(exchange); } if (include && answer.size() < browseLimit) { + if (!status) { + // ensure payload is downloaded (when not in status mode) + GenericFile<?> gf = exchange.getMessage().getBody(GenericFile.class); + if (gf != null) { + final String name = gf.getAbsoluteFilePath(); + try { + boolean downloaded = gfc.tryRetrievingFile(exchange, name, gf, name, gf); + if (downloaded) { + gf.getBinding().loadContent(exchange, gf); + Object data = gf.getBody(); + if (data != null) { + exchange.getMessage().setBody(data); + } + } + } catch (Exception e) { + LOG.debug("Error trying to retrieve file: {} due to: {}. This exception is ignored.", name, + e.getMessage(), e); + } + } + } answer.add(exchange); } } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 62dc1726b21..0b268ae4ee3 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -45,6 +45,7 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { GenericFileProcessStrategy processStrategy) { super(endpoint, processor, operations, processStrategy); this.setPollStrategy(new RemoteFilePollingConsumerPollStrategy()); + this.setRetrieveFile(endpoint.isDownload()); } @Override @@ -153,11 +154,6 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { return super.processExchange(exchange); } - @Override - protected boolean isRetrieveFile() { - return getEndpoint().isDownload(); - } - /** * Whether there is a starting directory configured. */ diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java index 0d3073958d5..ce24c49a2c5 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java @@ -26,6 +26,7 @@ import jakarta.jms.QueueBrowser; import jakarta.jms.Session; import org.apache.camel.Exchange; +import org.apache.camel.spi.BrowsableEndpoint; import org.springframework.jms.core.BrowserCallback; import org.springframework.jms.core.JmsOperations; @@ -46,6 +47,17 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy { } } + @Override + public BrowsableEndpoint.BrowseStatus browseStatus( + JmsOperations template, String queue, JmsBrowsableEndpoint endpoint, int limit) { + if (endpoint.getSelector() != null) { + return template.browseSelected(queue, endpoint.getSelector(), + (session, browser) -> doBrowseStatus(endpoint, session, browser, limit)); + } else { + return template.browse(queue, (session, browser) -> doBrowseStatus(endpoint, session, browser, limit)); + } + } + private static List<Exchange> doBrowse(JmsBrowsableEndpoint endpoint, Session session, QueueBrowser browser, int limit) throws JMSException { @@ -66,4 +78,32 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy { return answer; } + private static BrowsableEndpoint.BrowseStatus doBrowseStatus( + JmsBrowsableEndpoint endpoint, Session session, QueueBrowser browser, int limit) + throws JMSException { + if (limit <= 0) { + limit = Integer.MAX_VALUE; + } + + // not the best implementation in the world as we have to browse + // the entire queue, which could be massive + Enumeration<?> iter = browser.getEnumeration(); + + int size = 0; + long ts1 = 0; + long ts2 = 0; + Message message = null; + for (int i = 0; i < limit && iter.hasMoreElements(); i++) { + message = (Message) iter.nextElement(); + if (i == 0) { + ts1 = message.getJMSTimestamp(); + } + size++; + } + if (message != null && size > 0) { + ts2 = message.getJMSTimestamp(); + } + return new BrowsableEndpoint.BrowseStatus(size, ts1, ts2); + } + } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java index 8381b60f16e..dd64a343ac4 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java @@ -96,6 +96,16 @@ public class JmsQueueEndpoint extends JmsEndpoint implements JmsBrowsableEndpoin return getExchanges(maximumBrowseSize, null); } + @Override + public BrowseStatus getBrowseStatus(int limit) { + if (queueBrowseStrategy == null) { + return new BrowseStatus(0, 0, 0); + } + String queue = getDestinationName(); + JmsOperations template = getConfiguration().createInOnlyTemplate(this, false, queue); + return queueBrowseStrategy.browseStatus(template, queue, this, limit); + } + @Override public List<Exchange> getExchanges(int limit, Predicate filter) { if (queueBrowseStrategy == null) { diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java index 0a73d2f0c0f..9daa12b2ab7 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/QueueBrowseStrategy.java @@ -19,6 +19,7 @@ package org.apache.camel.component.jms; import java.util.List; import org.apache.camel.Exchange; +import org.apache.camel.spi.BrowsableEndpoint; import org.springframework.jms.core.JmsOperations; /** @@ -31,4 +32,19 @@ public interface QueueBrowseStrategy { */ List<Exchange> browse(JmsOperations template, String queue, JmsBrowsableEndpoint endpoint, int limit); + /** + * Browse quick status of the given queue + */ + default BrowsableEndpoint.BrowseStatus browseStatus( + JmsOperations template, String queue, JmsBrowsableEndpoint endpoint, int limit) { + List<Exchange> list = browse(template, queue, endpoint, limit); + long ts = 0; + long ts2 = 0; + if (!list.isEmpty()) { + ts = list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + ts2 = list.get(list.size() - 1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + } + return new BrowsableEndpoint.BrowseStatus(list.size(), ts, ts2); + } + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java b/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java index b85ce619854..8292aae4c25 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java @@ -29,6 +29,16 @@ import org.apache.camel.Exchange; */ public interface BrowsableEndpoint extends Endpoint { + /** + * A quick status of the browse queue + * + * @param size number of messages in the queue + * @param firstTimestamp timestamp of first message (0 if no information) + * @param lastTimestamp timestamp of last message (0 if no information) + */ + record BrowseStatus(int size, long firstTimestamp, long lastTimestamp) { + } + /** * Maximum number of messages to browse by default. */ @@ -39,6 +49,23 @@ public interface BrowsableEndpoint extends Endpoint { */ void setBrowseLimit(int browseLimit); + /** + * Returns a quick browse status + * + * @param limit to limit the result to a maximum. Use 0 for default limit. + * @return the status + */ + default BrowseStatus getBrowseStatus(int limit) { + List<Exchange> list = getExchanges(); + long ts = 0; + long ts2 = 0; + if (!list.isEmpty()) { + ts = list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + ts2 = list.get(list.size() - 1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + } + return new BrowseStatus(list.size(), ts, ts2); + } + /** * Return the exchanges available on this endpoint * diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java index 59d4880a846..b7e089eca63 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BrowseDevConsole.java @@ -118,18 +118,19 @@ public class BrowseDevConsole extends AbstractDevConsole { for (Endpoint endpoint : endpoints) { if (endpoint instanceof BrowsableEndpoint be && (filter == null || PatternHelper.matchPattern(endpoint.getEndpointUri(), filter))) { - List<Exchange> list = freshSize ? be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null); - int queueSize = list != null ? list.size() : 0; - int begin = 0; - if (list != null && pos > 0) { - begin = Math.max(0, list.size() - pos); - list = list.subList(begin, list.size()); - } - if (list != null) { - sb.append("\n"); - sb.append(String.format("Browse: %s (size: %d limit: %d position: %d)%n", endpoint.getEndpointUri(), - queueSize, max, begin)); - if (dump) { + + if (dump) { + List<Exchange> list = freshSize ? be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null); + int queueSize = list != null ? list.size() : 0; + int begin = 0; + if (list != null && pos > 0) { + begin = Math.max(0, list.size() - pos); + list = list.subList(begin, list.size()); + } + if (list != null) { + sb.append("\n"); + sb.append(String.format("Browse: %s (size: %d limit: %d position: %d)%n", endpoint.getEndpointUri(), + queueSize, max, begin)); for (Exchange e : list) { String json = MessageHelper.dumpAsJSon(e.getMessage(), false, false, includeBody, 2, true, true, true, @@ -138,6 +139,9 @@ public class BrowseDevConsole extends AbstractDevConsole { sb.append("\n"); } } + } else { + BrowsableEndpoint.BrowseStatus status = be.getBrowseStatus(Integer.MAX_VALUE); + sb.append(String.format("Browse: %s (size: %d%n", endpoint.getEndpointUri(), status.size())); } } } @@ -166,33 +170,34 @@ public class BrowseDevConsole extends AbstractDevConsole { for (Endpoint endpoint : endpoints) { if (endpoint instanceof BrowsableEndpoint be && (filter == null || PatternHelper.matchPattern(endpoint.getEndpointUri(), filter))) { - List<Exchange> list = freshSize ? be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null); - int queueSize = list != null ? list.size() : 0; - int begin = 0; - if (list != null && pos > 0) { - begin = Math.max(0, list.size() - pos); - list = list.subList(begin, list.size()); - } - if (list != null) { - JsonObject jo = new JsonObject(); - jo.put("endpointUri", endpoint.getEndpointUri()); - jo.put("queueSize", queueSize); - jo.put("limit", max); - jo.put("position", begin); - if (!list.isEmpty()) { - long ts = list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); - if (ts > 0) { - jo.put("firstTimestamp", ts); - } - if (list.size() > 1) { - ts = list.get(list.size() - 1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); + if (dump) { + List<Exchange> list = freshSize ? be.getExchanges(Integer.MAX_VALUE, null) : be.getExchanges(max, null); + int queueSize = list != null ? list.size() : 0; + int begin = 0; + if (list != null && pos > 0) { + begin = Math.max(0, list.size() - pos); + list = list.subList(begin, list.size()); + } + if (list != null) { + JsonObject jo = new JsonObject(); + jo.put("endpointUri", endpoint.getEndpointUri()); + jo.put("queueSize", queueSize); + jo.put("limit", max); + jo.put("position", begin); + if (!list.isEmpty()) { + long ts = list.get(0).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, long.class); if (ts > 0) { - jo.put("lastTimestamp", ts); + jo.put("firstTimestamp", ts); + } + if (list.size() > 1) { + ts = list.get(list.size() - 1).getMessage().getHeader(Exchange.MESSAGE_TIMESTAMP, 0, + long.class); + if (ts > 0) { + jo.put("lastTimestamp", ts); + } } } - } - arr.add(jo); - if (dump) { + arr.add(jo); JsonArray arr2 = new JsonArray(); for (Exchange e : list) { arr2.add(MessageHelper.dumpAsJSonObject(e.getMessage(), false, false, includeBody, true, true, true, @@ -202,6 +207,18 @@ public class BrowseDevConsole extends AbstractDevConsole { jo.put("messages", arr2); } } + } else { + BrowsableEndpoint.BrowseStatus status = be.getBrowseStatus(Integer.MAX_VALUE); + JsonObject jo = new JsonObject(); + jo.put("endpointUri", endpoint.getEndpointUri()); + jo.put("queueSize", status.size()); + if (status.firstTimestamp() > 0) { + jo.put("firstTimestamp", status.firstTimestamp()); + } + if (status.lastTimestamp() > 0) { + jo.put("lastTimestamp", status.lastTimestamp()); + } + arr.add(jo); } } } diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java index 4abcdcff063..79f2f20b935 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBrowsableEndpointAsJSonFileTest.java @@ -45,7 +45,7 @@ public class ManagedBrowsableEndpointAsJSonFileTest extends ManagementTestSuppor assertNotNull(out); log.info(out); assertTrue(out.contains("\"value\": \"Hello World\"")); - assertTrue(out.contains("\"type\": \"org.apache.camel.component.file.GenericFile\"")); + assertTrue(out.contains("\"type\": \"java.lang.String\"")); assertTrue(out.contains("\"value\": \"hello.txt\"")); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java index 4437109ad83..580118f5d60 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java @@ -184,8 +184,8 @@ public class CamelBrowseAction extends ActionBaseCommand { row.uri = URISupport.sanitizeUri(row.uri); } row.queueSize = o.getInteger("queueSize"); - row.limit = o.getInteger("limit"); - row.position = o.getInteger("position"); + row.limit = o.getIntegerOrDefault("limit", 0); + row.position = o.getIntegerOrDefault("position", 0); row.firstTimestamp = o.getLongOrDefault("firstTimestamp", 0); row.lastTimestamp = o.getLongOrDefault("lastTimestamp", 0); if (dump) { @@ -315,7 +315,7 @@ public class CamelBrowseAction extends ActionBaseCommand { if (freshSize) { return "" + r.queueSize; } - if (r.queueSize >= r.limit) { + if (r.limit > 0 && r.queueSize >= r.limit) { return r.queueSize + "+"; } return "" + r.queueSize;
