This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch once in repository https://gitbox.apache.org/repos/asf/camel.git
commit cee0db7c8f560b6f48d4b3549b527b65827fe56d Author: Claus Ibsen <[email protected]> AuthorDate: Wed Dec 17 16:05:46 2025 +0100 CAMEL-22431: camel-once - A component for development to trigger only once --- .../apache/camel/component/once/OnceConsumer.java | 70 +++++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java b/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java index 776bcdbceda1..b27745fc48ce 100644 --- a/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java +++ b/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java @@ -16,20 +16,86 @@ */ package org.apache.camel.component.once; +import java.util.Timer; +import java.util.TimerTask; + import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.StartupListener; -import org.apache.camel.Suspendable; import org.apache.camel.support.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OnceConsumer extends DefaultConsumer implements StartupListener { -public class OnceConsumer extends DefaultConsumer implements StartupListener, Suspendable { + private static final Logger LOG = LoggerFactory.getLogger(OnceConsumer.class); + + private final OnceEndpoint endpoint; + private final Timer timer; + private TimerTask task; + private volatile boolean scheduled; public OnceConsumer(OnceEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.endpoint = endpoint; + this.timer = new Timer(endpoint.getName()); + } + + @Override + public void doInit() throws Exception { + task = new TimerTask() { + @Override + public void run() { + Exchange exchange = createExchange(false); + try { + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + + // handle any thrown exception + try { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); + } + } finally { + releaseExchange(exchange, false); + } + } + }; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (task != null && !scheduled && endpoint.getCamelContext().getStatus().isStarted()) { + scheduleTask(task, timer); + } + } + + @Override + protected void doStop() throws Exception { + if (task != null) { + task.cancel(); + } + task = null; + scheduled = false; + + super.doStop(); } @Override public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + if (task != null && !scheduled) { + scheduleTask(task, timer); + } + } + protected void scheduleTask(TimerTask task, Timer timer) { + LOG.debug("Scheduled once after: {} mills for task: {} ", endpoint.getDelay(), task); + timer.schedule(task, endpoint.getDelay()); + scheduled = true; } }
