This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b0fca17c045 CAMEL-22949: Migrate components from Thread.sleep() to 
Camel's Task API (#21215)
1b0fca17c045 is described below

commit 1b0fca17c045f743bf433e8196f2e9d1b172607d
Author: Guillaume Nodet <[email protected]>
AuthorDate: Mon Feb 2 15:50:05 2026 +0100

    CAMEL-22949: Migrate components from Thread.sleep() to Camel's Task API 
(#21215)
    
    - Migrate 5 components: salesforce, aws2-athena, zookeeper, hazelcast, iggy
    - Replace Thread.sleep() with Tasks.foregroundTask() for better lifecycle
    - Use withInitialDelay() for one-time delays
---
 .../component/aws2/athena/Athena2QueryHelper.java  | 24 +++++++++++++-------
 .../hazelcast/seda/HazelcastSedaConsumer.java      | 19 +++++++++++-----
 .../camel/component/iggy/IggyFetchRecords.java     | 19 +++++++++++-----
 .../internal/streaming/SubscriptionHelper.java     | 26 +++++++++++++++-------
 .../component/zookeeper/ZooKeeperConsumer.java     | 18 ++++++++++++---
 5 files changed, 77 insertions(+), 29 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
 
b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
index 6883807b9866..f4cc50e95c06 100644
--- 
a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
+++ 
b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.athena;
 
 import java.time.Clock;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -25,6 +26,8 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +41,7 @@ class Athena2QueryHelper {
     private static final Logger LOG = 
LoggerFactory.getLogger(Athena2QueryHelper.class);
 
     // configuration ======================
+    private final Exchange exchange;
     private final Clock clock = Clock.systemUTC();
     private final long waitTimeout;
     private final long delay;
@@ -56,6 +60,7 @@ class Athena2QueryHelper {
     private boolean interrupted;
 
     Athena2QueryHelper(Exchange exchange, Athena2Configuration configuration) {
+        this.exchange = exchange;
         this.waitTimeout = determineWaitTimeout(exchange, configuration);
         this.delay = determineDelay(exchange, configuration);
         this.maxAttempts = determineMaxAttempts(exchange, configuration);
@@ -162,15 +167,18 @@ class Athena2QueryHelper {
     }
 
     void doWait() {
-        try {
-            Thread.sleep(this.currentDelay);
-        } catch (InterruptedException e) {
-            this.interrupted = Thread.interrupted(); // store, then clear, 
interrupt status
-            LOG.trace(
-                    "AWS Athena start query execution wait thread was 
interrupted; will return at earliest opportunity");
+        // Use Camel's task API for polling delay instead of Thread.sleep()
+        // We use initialDelay for the actual delay, and maxIterations(1) to 
run once
+        Tasks.foregroundTask()
+                .withBudget(Budgets.iterationBudget()
+                        .withMaxIterations(1)
+                        .withInitialDelay(Duration.ofMillis(this.currentDelay))
+                        .withInterval(Duration.ZERO)
+                        .build())
+                .withName("AthenaQueryPollingDelay")
+                .build()
+                .run(exchange.getContext(), () -> true);
 
-            Thread.currentThread().interrupt();
-        }
         this.currentDelay = this.delay;
     }
 
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index fd811f305e81..81110a3ff295 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hazelcast.seda;
 
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -30,6 +31,8 @@ import org.apache.camel.Processor;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultExchangeHolder;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,11 +151,17 @@ public class HazelcastSedaConsumer extends 
DefaultConsumer implements Runnable {
                     }
                 }
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
-                try {
-                    
Thread.sleep(endpoint.getConfiguration().getOnErrorDelay());
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                }
+                // Use Camel's task API for error recovery delay instead of 
Thread.sleep()
+                // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                Tasks.foregroundTask()
+                        .withBudget(Budgets.iterationBudget()
+                                .withMaxIterations(1)
+                                
.withInitialDelay(Duration.ofMillis(endpoint.getConfiguration().getOnErrorDelay()))
+                                .withInterval(Duration.ZERO)
+                                .build())
+                        .withName("HazelcastSedaErrorRecoveryDelay")
+                        .build()
+                        .run(getEndpoint().getCamelContext(), () -> true);
             }
         }
     }
diff --git 
a/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
 
b/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
index f9a97a7b27d5..dd7b35efa151 100644
--- 
a/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
+++ 
b/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.iggy;
 
 import java.math.BigInteger;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,8 @@ import java.util.stream.Collectors;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.iggy.client.IggyClientConnectionPool;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.apache.iggy.client.blocking.IggyBaseClient;
 import org.apache.iggy.consumergroup.Consumer;
 import org.apache.iggy.identifier.ConsumerId;
@@ -66,11 +69,17 @@ public class IggyFetchRecords implements Runnable {
         while (running) {
             if (iggyConsumer.isSuspending() || iggyConsumer.isSuspended()) {
                 LOG.trace("Consumer is suspended. Skipping message polling.");
-                try {
-                    Thread.sleep(1000); // Sleep for a bit to avoid 
busy-waiting
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+                // Use Camel's task API to avoid busy-waiting instead of 
Thread.sleep()
+                // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                Tasks.foregroundTask()
+                        .withBudget(Budgets.iterationBudget()
+                                .withMaxIterations(1)
+                                .withInitialDelay(Duration.ofSeconds(1))
+                                .withInterval(Duration.ZERO)
+                                .build())
+                        .withName("IggySuspendedDelay")
+                        .build()
+                        .run(endpoint.getCamelContext(), () -> true);
                 continue;
             }
 
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 26de3104ce4e..d62e539d3226 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -21,6 +21,7 @@ import java.net.CookieManager;
 import java.net.CookiePolicy;
 import java.net.CookieStore;
 import java.net.URI;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,8 @@ import 
org.apache.camel.component.salesforce.StreamingApiConsumer;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.cometd.bayeux.Message;
 import org.cometd.bayeux.client.ClientSessionChannel;
 import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
@@ -215,14 +218,21 @@ public class SubscriptionHelper extends ServiceSupport {
             } else {
                 abort = false;
 
-                try {
-                    LOG.debug("Pausing for {} msecs before subscribe attempt", 
backoff);
-                    Thread.sleep(backoff);
-                    for (var consumer : consumers) {
-                        subscribe(consumer);
-                    }
-                } catch (InterruptedException e) {
-                    LOG.warn("Aborting subscribe on interrupt!", e);
+                LOG.debug("Pausing for {} msecs before subscribe attempt", 
backoff);
+                // Use Camel's task API for backoff delay instead of 
Thread.sleep()
+                // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                Tasks.foregroundTask()
+                        .withBudget(Budgets.iterationBudget()
+                                .withMaxIterations(1)
+                                .withInitialDelay(Duration.ofMillis(backoff))
+                                .withInterval(Duration.ZERO)
+                                .build())
+                        .withName("SalesforceSubscribeRetryDelay")
+                        .build()
+                        .run(component.getCamelContext(), () -> true);
+
+                for (var consumer : consumers) {
+                    subscribe(consumer);
                 }
             }
         } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
diff --git 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index a7540c3274a7..16f81de19fff 100644
--- 
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++ 
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.zookeeper;
 
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -31,6 +32,8 @@ import 
org.apache.camel.component.zookeeper.operations.GetDataOperation;
 import org.apache.camel.component.zookeeper.operations.OperationResult;
 import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -174,11 +177,20 @@ public class ZooKeeperConsumer extends DefaultConsumer {
         private void backoffAndThenRestart() {
             try {
                 if (isRunAllowed()) {
-                    Thread.sleep(configuration.getBackoff());
+                    // Use Camel's task API for reconnection backoff delay 
instead of Thread.sleep()
+                    // We use initialDelay for the actual delay, and 
maxIterations(1) to run once
+                    Tasks.foregroundTask()
+                            .withBudget(Budgets.iterationBudget()
+                                    .withMaxIterations(1)
+                                    
.withInitialDelay(Duration.ofMillis(configuration.getBackoff()))
+                                    .withInterval(Duration.ZERO)
+                                    .build())
+                            .withName("ZooKeeperReconnectBackoff")
+                            .build()
+                            .run(getEndpoint().getCamelContext(), () -> true);
+
                     initializeConsumer();
                 }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
             } catch (Exception e) {
                 // ignore
             }

Reply via email to