This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch once
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cee0db7c8f560b6f48d4b3549b527b65827fe56d
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Dec 17 16:05:46 2025 +0100

    CAMEL-22431: camel-once - A component for development to trigger only once
---
 .../apache/camel/component/once/OnceConsumer.java  | 70 +++++++++++++++++++++-
 1 file changed, 68 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java
 
b/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java
index 776bcdbceda1..b27745fc48ce 100644
--- 
a/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java
+++ 
b/components/camel-once/src/main/java/org/apache/camel/component/once/OnceConsumer.java
@@ -16,20 +16,86 @@
  */
 package org.apache.camel.component.once;
 
+import java.util.Timer;
+import java.util.TimerTask;
+
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.StartupListener;
-import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OnceConsumer extends DefaultConsumer implements StartupListener {
 
-public class OnceConsumer extends DefaultConsumer implements StartupListener, 
Suspendable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OnceConsumer.class);
+
+    private final OnceEndpoint endpoint;
+    private final Timer timer;
+    private TimerTask task;
+    private volatile boolean scheduled;
 
     public OnceConsumer(OnceEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.timer = new Timer(endpoint.getName());
+    }
+
+    @Override
+    public void doInit() throws Exception {
+        task = new TimerTask() {
+            @Override
+            public void run() {
+                Exchange exchange = createExchange(false);
+                try {
+                    getProcessor().process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                }
+
+                // handle any thrown exception
+                try {
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
+                    }
+                } finally {
+                    releaseExchange(exchange, false);
+                }
+            }
+        };
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (task != null && !scheduled && 
endpoint.getCamelContext().getStatus().isStarted()) {
+            scheduleTask(task, timer);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (task != null) {
+            task.cancel();
+        }
+        task = null;
+        scheduled = false;
+
+        super.doStop();
     }
 
     @Override
     public void onCamelContextStarted(CamelContext context, boolean 
alreadyStarted) throws Exception {
+        if (task != null && !scheduled) {
+            scheduleTask(task, timer);
+        }
+    }
 
+    protected void scheduleTask(TimerTask task, Timer timer) {
+        LOG.debug("Scheduled once after: {} mills for task: {} ", 
endpoint.getDelay(), task);
+        timer.schedule(task, endpoint.getDelay());
+        scheduled = true;
     }
 }

Reply via email to