This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch poll-dyn in repository https://gitbox.apache.org/repos/asf/camel.git
commit 01f690efecadb7ecf17d2cc93b25f8d3c75a9733 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 7 22:12:17 2025 +0100 CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse endpoints during dynamic poll EIP --- .../apache/camel/component/file/FileConsumer.java | 25 +++++---- .../camel/component/file/GenericFileConsumer.java | 46 ++++++++++----- .../component/file/GenericFilePollingConsumer.java | 34 ++++++++--- .../org/apache/camel/DynamicPollingConsumer.java | 65 ++++++++++++++++++++++ .../apache/camel/model/PollEnrichDefinition.java | 20 +++++++ .../org/apache/camel/processor/PollEnricher.java | 13 ++++- ...> PollDynamicFileNameOptimizeDisabledTest.java} | 31 ++++++++++- .../enricher/PollDynamicFileNameTest.java | 26 ++++++++- .../camel/support/ScheduledPollConsumer.java | 12 ++++ 9 files changed, 232 insertions(+), 40 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index d2b6dfbd57b..58df9d374c2 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -74,7 +74,7 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa return exchange; } - private boolean pollDirectory(File directory, List<GenericFile<File>> fileList, int depth) { + private boolean pollDirectory(Exchange dynamic, File directory, List<GenericFile<File>> fileList, int depth) { depth++; if (LOG.isTraceEnabled()) { @@ -89,14 +89,14 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa Arrays.sort(files, Comparator.comparing(File::getAbsoluteFile)); } - if (processPolledFiles(fileList, depth, files)) { + if (processPolledFiles(dynamic, fileList, depth, files)) { return false; } return true; } - private boolean processPolledFiles(List<GenericFile<File>> fileList, int depth, File[] files) { + private boolean processPolledFiles(Exchange dynamic, List<GenericFile<File>> fileList, int depth, File[] files) { for (File file : files) { // check if we can continue polling in files if (!canPollMoreFiles(fileList)) { @@ -125,7 +125,7 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa } } - if (processEntry(fileList, depth, file, gf, files)) { + if (processEntry(dynamic, fileList, depth, file, gf, files)) { return true; } } @@ -133,23 +133,25 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa } private boolean processEntry( + Exchange dynamic, List<GenericFile<File>> fileList, int depth, File file, Supplier<GenericFile<File>> gf, File[] files) { if (file.isDirectory()) { - return processDirectoryEntry(fileList, depth, file, gf, files); + return processDirectoryEntry(dynamic, fileList, depth, file, gf, files); } else { - processFileEntry(fileList, depth, file, gf, files); + processFileEntry(dynamic, fileList, depth, file, gf, files); } return false; } private void processFileEntry( + Exchange dynamic, List<GenericFile<File>> fileList, int depth, File file, Supplier<GenericFile<File>> gf, File[] files) { // Windows can report false to a file on a share so regard it // always as a file (if it is not a directory) if (depth >= endpoint.minDepth) { boolean valid - = isValidFile(gf, file.getName(), file.getAbsolutePath(), + = isValidFile(dynamic, gf, file.getName(), file.getAbsolutePath(), getRelativeFilePath(endpointPath, null, null, file), false, files); if (valid) { @@ -168,14 +170,15 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa } private boolean processDirectoryEntry( + Exchange dynamic, List<GenericFile<File>> fileList, int depth, File file, Supplier<GenericFile<File>> gf, File[] files) { if (endpoint.isRecursive() && depth < endpoint.getMaxDepth()) { boolean valid - = isValidFile(gf, file.getName(), file.getAbsolutePath(), + = isValidFile(dynamic, gf, file.getName(), file.getAbsolutePath(), getRelativeFilePath(endpointPath, null, null, file), true, files); if (valid) { - boolean canPollMore = pollDirectory(file, fileList, depth); + boolean canPollMore = pollDirectory(dynamic, file, fileList, depth); return !canPollMore; } } @@ -194,7 +197,7 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa } @Override - protected boolean pollDirectory(String fileName, List<GenericFile<File>> fileList, int depth) { + protected boolean pollDirectory(Exchange dynamic, String fileName, List<GenericFile<File>> fileList, int depth) { LOG.trace("pollDirectory from fileName: {}", fileName); File directory = new File(fileName); @@ -206,7 +209,7 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa return true; } - return pollDirectory(directory, fileList, depth); + return pollDirectory(dynamic, directory, fileList, depth); } private File[] listFiles(File directory) { diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 7f87cb2af1d..de0ffcf65a4 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -33,6 +33,7 @@ import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.support.EmptyAsyncCallback; +import org.apache.camel.support.MessageHelper; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; @@ -107,11 +108,16 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum */ protected abstract Exchange createExchange(GenericFile<T> file); + @Override + protected int poll() throws Exception { + return poll(null); + } + /** * Poll for files */ @Override - public int poll() throws Exception { + protected int poll(Exchange dynamic) throws Exception { // must prepare on startup the very first time if (!prepareOnStartup) { // prepare on startup @@ -138,7 +144,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum StopWatch stop = new StopWatch(); boolean limitHit; try { - limitHit = !pollDirectory(name, files, 0); + limitHit = !pollDirectory(dynamic, name, files, 0); } catch (Exception e) { // during poll directory we add files to the in progress repository, // in case of any exception thrown after this work @@ -336,7 +342,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been * hit */ - protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); + protected abstract boolean pollDirectory(Exchange dynamic, String fileName, List<GenericFile<T>> fileList, int depth); /** * Sets the operations to be used. @@ -429,7 +435,6 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // must use full name when downloading so we have the correct path final String name = target.getAbsoluteFilePath(); try { - if (isRetrieveFile()) { if (!tryRetrievingFile(exchange, name, target, absoluteFileName, file)) { return false; @@ -587,9 +592,10 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it */ protected boolean isValidFile( + Exchange dynamic, Supplier<GenericFile<T>> file, String name, String absoluteFilePath, Supplier<String> relativeFilePath, boolean isDirectory, T[] files) { - if (!isMatched(file, name, absoluteFilePath, relativeFilePath, isDirectory, files)) { + if (!isMatched(dynamic, file, name, absoluteFilePath, relativeFilePath, isDirectory, files)) { LOG.trace("File did not match. Will skip this file: {}", name); return false; } @@ -610,7 +616,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // if it is a file then check we have the file in the idempotent registry // already if (Boolean.TRUE.equals(endpoint.isIdempotent())) { - if (notUnique(file, absoluteFilePath)) { + if (notUnique(dynamic, file, absoluteFilePath)) { return false; } } @@ -621,13 +627,13 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return endpoint.getInProgressRepository().add(absoluteFilePath); } - private boolean notUnique(Supplier<GenericFile<T>> file, String absoluteFilePath) { + private boolean notUnique(Exchange dynamic, Supplier<GenericFile<T>> file, String absoluteFilePath) { boolean answer = false; // use absolute file path as default key, but evaluate if an // expression key was configured String key = absoluteFilePath; if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file.get()); + Exchange dummy = createDummy(dynamic, file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); LOG.trace("Evaluated idempotentKey: {} for file: {}", key, file); } @@ -686,6 +692,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not */ protected boolean isMatched( + Exchange dynamic, Supplier<GenericFile<T>> file, String name, String absoluteFilePath, Supplier<String> relativeFilePath, boolean isDirectory, T[] files) { @@ -722,9 +729,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum } if (isDirectory && endpoint.getFilterDirectory() != null) { - // create a dummy exchange as Exchange is needed for expression - // evaluation - Exchange dummy = endpoint.createExchange(file.get()); + // create a dummy exchange as Exchange is needed for expression evaluation + Exchange dummy = createDummy(dynamic, file); boolean matches = endpoint.getFilterDirectory().matches(dummy); if (!matches) { return false; @@ -742,7 +748,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (endpoint.getFileName() != null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = endpoint.createExchange(file.get()); + Exchange dummy = createDummy(dynamic, file); String result = evaluateFileExpression(dummy); if (result != null) { if (!name.equals(result)) { @@ -753,7 +759,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (endpoint.getFilterFile() != null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = endpoint.createExchange(file.get()); + Exchange dummy = createDummy(dynamic, file); boolean matches = endpoint.getFilterFile().matches(dummy); if (!matches) { return false; @@ -849,6 +855,20 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return result; } + protected Exchange createDummy(Exchange dynamic, Supplier<GenericFile<T>> file) { + Exchange dummy = endpoint.createExchange(file.get()); + if (dynamic != null) { + // enrich with data from dynamic source + if (dynamic.getMessage().hasHeaders()) { + MessageHelper.copyHeaders(dynamic.getMessage(), dummy.getMessage(), true); + if (dynamic.hasVariables()) { + dummy.getVariables().putAll(dynamic.getVariables()); + } + } + } + return dummy; + } + @SuppressWarnings("unchecked") private GenericFile<T> getExchangeFileProperty(Exchange exchange) { return (GenericFile<T>) exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java index 68c9474b44e..4f29e6358ac 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.file; import org.apache.camel.Consumer; +import org.apache.camel.DynamicPollingConsumer; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.PollingConsumerPollStrategy; @@ -26,7 +27,7 @@ import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { +public class GenericFilePollingConsumer extends EventDrivenPollingConsumer implements DynamicPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class); private final long delay; @@ -65,11 +66,11 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { } @Override - public Exchange receiveNoWait() { + public Exchange receiveNoWait(Exchange exchange) { if (LOG.isTraceEnabled()) { LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint()); } - int polled = doReceive(0); + int polled = doReceive(exchange, 0); if (polled > 0) { return super.receive(0); } else { @@ -78,11 +79,11 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { } @Override - public Exchange receive() { + public Exchange receive(Exchange exchange) { if (LOG.isTraceEnabled()) { LOG.trace("receive polling file: {}", getConsumer().getEndpoint()); } - int polled = doReceive(Long.MAX_VALUE); + int polled = doReceive(exchange, Long.MAX_VALUE); if (polled > 0) { return super.receive(); } else { @@ -91,11 +92,11 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { } @Override - public Exchange receive(long timeout) { + public Exchange receive(Exchange exchange, long timeout) { if (LOG.isTraceEnabled()) { LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint()); } - int polled = doReceive(timeout); + int polled = doReceive(exchange, timeout); if (polled > 0) { return super.receive(timeout); } else { @@ -103,7 +104,22 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { } } - protected int doReceive(long timeout) { + @Override + public Exchange receiveNoWait() { + return receiveNoWait(null); + } + + @Override + public Exchange receive() { + return receive(null); + } + + @Override + public Exchange receive(long timeout) { + return receive(null, timeout); + } + + protected int doReceive(Exchange exchange, long timeout) { int retryCounter = -1; boolean done = false; Throwable cause = null; @@ -130,7 +146,7 @@ public class GenericFilePollingConsumer extends EventDrivenPollingConsumer { boolean begin = pollStrategy.begin(getConsumer(), getEndpoint()); if (begin) { retryCounter++; - polledMessages = getConsumer().poll(); + polledMessages = getConsumer().poll(exchange); LOG.trace("Polled {} messages", polledMessages); if (polledMessages == 0 && sendEmptyMessageWhenIdle) { diff --git a/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java b/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java new file mode 100644 index 00000000000..c939c607375 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * A {@link PollingConsumer} that are used by dynamic Poll and PollEnrich EIPs to facilitate components that can use + * information from the current {@link Exchange} during the poll. + */ +public interface DynamicPollingConsumer extends PollingConsumer { + + /** + * Waits until a message is available and then returns it. Warning that this method could block indefinitely if no + * messages are available. + * <p/> + * Will return <tt>null</tt> if the consumer is not started + * <p/> + * <b>Important: </b> See the class javadoc about the need for done the {@link org.apache.camel.spi.UnitOfWork} on + * the returned {@link Exchange} + * + * @param exchange the current exchange + * + * @return the message exchange received. + */ + Exchange receive(Exchange exchange); + + /** + * Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message + * exchange is not available yet. + * <p/> + * <b>Important: </b> See the class javadoc about the need for done the {@link org.apache.camel.spi.UnitOfWork} on + * the returned {@link Exchange} + * + * @return the message exchange if one is immediately available otherwise <tt>null</tt> + */ + Exchange receiveNoWait(Exchange exchange); + + /** + * Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet + * available. + * <p/> + * <b>Important: </b> See the class javadoc about the need for done the {@link org.apache.camel.spi.UnitOfWork} on + * the returned {@link Exchange} + * + * @param timeout the amount of time in milliseconds to wait for a message before timing out and returning + * <tt>null</tt> + * + * @return the message exchange if one was available within the timeout period, or <tt>null</tt> if the + * timeout expired + */ + Exchange receive(Exchange exchange, long timeout); +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index a13f9df7f69..8ba0f887054 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -280,6 +280,16 @@ public class PollEnrichDefinition extends ExpressionNode return this; } + /** + * Whether to auto startup components when poll enricher is starting up. + * + * @return the builder + */ + public PollEnrichDefinition autoStartComponents(boolean autoStartComponents) { + setAutoStartComponents(Boolean.toString(autoStartComponents)); + return this; + } + /** * Whether to allow components to optimise if they are {@link org.apache.camel.spi.SendDynamicAware}. * @@ -290,6 +300,16 @@ public class PollEnrichDefinition extends ExpressionNode return this; } + /** + * Whether to allow components to optimise if they are {@link org.apache.camel.spi.SendDynamicAware}. + * + * @return the builder + */ + public PollEnrichDefinition allowOptimisedComponents(boolean allowOptimisedComponents) { + setAllowOptimisedComponents(Boolean.toString(allowOptimisedComponents)); + return this; + } + // Properties // ------------------------------------------------------------------------- diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index e2aeb8f66b7..0e81b621b02 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -25,6 +25,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Component; import org.apache.camel.Consumer; +import org.apache.camel.DynamicPollingConsumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; @@ -323,6 +324,11 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout final Processor preProcessor = preAwareProcessor; final Processor postProcessor = postAwareProcessor; + DynamicPollingConsumer dynamicConsumer = null; + if (consumer instanceof DynamicPollingConsumer dyn) { + dynamicConsumer = dyn; + } + Exchange resourceExchange; try { if (preProcessor != null) { @@ -331,15 +337,16 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout if (timeout < 0) { LOG.debug("Consumer receive: {}", consumer); - resourceExchange = consumer.receive(); + resourceExchange = dynamicConsumer != null ? dynamicConsumer.receive(exchange) : consumer.receive(); } else if (timeout == 0) { LOG.debug("Consumer receiveNoWait: {}", consumer); - resourceExchange = consumer.receiveNoWait(); + resourceExchange = dynamicConsumer != null ? dynamicConsumer.receiveNoWait(exchange) : consumer.receiveNoWait(); } else { if (LOG.isDebugEnabled()) { LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); } - resourceExchange = consumer.receive(timeout); + resourceExchange + = dynamicConsumer != null ? dynamicConsumer.receive(exchange, timeout) : consumer.receive(timeout); } if (resourceExchange == null) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java similarity index 54% copy from core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java copy to core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java index 7eaf817514b..fcf2c588d51 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java @@ -19,12 +19,13 @@ package org.apache.camel.processor.enricher; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class PollDynamicFileNameTest extends ContextTestSupport { +public class PollDynamicFileNameOptimizeDisabledTest extends ContextTestSupport { @Test - public void testPollEnrichFile() throws Exception { + public void testPollEnrichFileOne() throws Exception { getMockEndpoint("mock:result").expectedMessageCount(2); getMockEndpoint("mock:result").message(0).body().isEqualTo("Hello World"); getMockEndpoint("mock:result").message(1).body().isNull(); @@ -35,6 +36,29 @@ public class PollDynamicFileNameTest extends ContextTestSupport { template.sendBodyAndHeader("direct:start", "Bar", "target", "unknown.txt"); assertMockEndpointsSatisfied(); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("file") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(2, c, "There should only be 2 file endpoints"); + } + + @Test + public void testPollEnrichFileTwo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World"); + + template.sendBodyAndHeader(fileUri(), "Hello World", Exchange.FILE_NAME, "myfile.txt"); + template.sendBodyAndHeader(fileUri(), "Bye World", Exchange.FILE_NAME, "myfile2.txt"); + + template.sendBodyAndHeader("direct:start", "Foo", "target", "myfile.txt"); + template.sendBodyAndHeader("direct:start", "Bar", "target", "myfile2.txt"); + + assertMockEndpointsSatisfied(); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("file") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(2, c, "There should only be 2 file endpoints"); } @Override @@ -43,7 +67,8 @@ public class PollDynamicFileNameTest extends ContextTestSupport { @Override public void configure() { from("direct:start") - .poll(fileUri() + "?noop=true&fileName=${header.target}", 500) + .pollEnrich().simple(fileUri() + "?noop=true&fileName=${header.target}") + .allowOptimisedComponents(false).timeout(500) .to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java index 7eaf817514b..cd80d65f042 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java @@ -19,12 +19,13 @@ package org.apache.camel.processor.enricher; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class PollDynamicFileNameTest extends ContextTestSupport { @Test - public void testPollEnrichFile() throws Exception { + public void testPollEnrichFileOne() throws Exception { getMockEndpoint("mock:result").expectedMessageCount(2); getMockEndpoint("mock:result").message(0).body().isEqualTo("Hello World"); getMockEndpoint("mock:result").message(1).body().isNull(); @@ -35,6 +36,29 @@ public class PollDynamicFileNameTest extends ContextTestSupport { template.sendBodyAndHeader("direct:start", "Bar", "target", "unknown.txt"); assertMockEndpointsSatisfied(); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("file") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(1, c, "There should only be 1 file endpoint"); + } + + @Test + public void testPollEnrichFileTwo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World"); + + template.sendBodyAndHeader(fileUri(), "Hello World", Exchange.FILE_NAME, "myfile.txt"); + template.sendBodyAndHeader(fileUri(), "Bye World", Exchange.FILE_NAME, "myfile2.txt"); + + template.sendBodyAndHeader("direct:start", "Foo", "target", "myfile.txt"); + template.sendBodyAndHeader("direct:start", "Bar", "target", "myfile2.txt"); + + assertMockEndpointsSatisfied(); + + // there should only be 1 file endpoint + long c = context.getEndpoints().stream() + .filter(e -> e.getEndpointKey().startsWith("file") && e.getEndpointUri().contains("?fileName=")).count(); + Assertions.assertEquals(1, c, "There should only be 1 file endpoint"); } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java index 658f7896d25..c6f852bbec2 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java @@ -568,6 +568,18 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer */ protected abstract int poll() throws Exception; + /** + * The polling method which is invoked periodically to poll this consumer, for components that support + * {@link org.apache.camel.DynamicPollingConsumer} such as camel-file. + * + * @param dynamic the current exchange when being used from Poll and PollEnrich EIPs in dynamic mode, + * @return number of messages polled, will be <tt>0</tt> if no message was polled at all. + * @throws Exception can be thrown if an exception occurred during polling + */ + protected int poll(Exchange dynamic) throws Exception { + return poll(); + } + @Override protected void doBuild() throws Exception { if (getHealthCheck() == null) {
