This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1b0fca17c045 CAMEL-22949: Migrate components from Thread.sleep() to
Camel's Task API (#21215)
1b0fca17c045 is described below
commit 1b0fca17c045f743bf433e8196f2e9d1b172607d
Author: Guillaume Nodet <[email protected]>
AuthorDate: Mon Feb 2 15:50:05 2026 +0100
CAMEL-22949: Migrate components from Thread.sleep() to Camel's Task API
(#21215)
- Migrate 5 components: salesforce, aws2-athena, zookeeper, hazelcast, iggy
- Replace Thread.sleep() with Tasks.foregroundTask() for better lifecycle
- Use withInitialDelay() for one-time delays
---
.../component/aws2/athena/Athena2QueryHelper.java | 24 +++++++++++++-------
.../hazelcast/seda/HazelcastSedaConsumer.java | 19 +++++++++++-----
.../camel/component/iggy/IggyFetchRecords.java | 19 +++++++++++-----
.../internal/streaming/SubscriptionHelper.java | 26 +++++++++++++++-------
.../component/zookeeper/ZooKeeperConsumer.java | 18 ++++++++++++---
5 files changed, 77 insertions(+), 29 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
index 6883807b9866..f4cc50e95c06 100644
---
a/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
+++
b/components/camel-aws/camel-aws2-athena/src/main/java/org/apache/camel/component/aws2/athena/Athena2QueryHelper.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws2.athena;
import java.time.Clock;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -25,6 +26,8 @@ import java.util.List;
import java.util.Set;
import org.apache.camel.Exchange;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +41,7 @@ class Athena2QueryHelper {
private static final Logger LOG =
LoggerFactory.getLogger(Athena2QueryHelper.class);
// configuration ======================
+ private final Exchange exchange;
private final Clock clock = Clock.systemUTC();
private final long waitTimeout;
private final long delay;
@@ -56,6 +60,7 @@ class Athena2QueryHelper {
private boolean interrupted;
Athena2QueryHelper(Exchange exchange, Athena2Configuration configuration) {
+ this.exchange = exchange;
this.waitTimeout = determineWaitTimeout(exchange, configuration);
this.delay = determineDelay(exchange, configuration);
this.maxAttempts = determineMaxAttempts(exchange, configuration);
@@ -162,15 +167,18 @@ class Athena2QueryHelper {
}
void doWait() {
- try {
- Thread.sleep(this.currentDelay);
- } catch (InterruptedException e) {
- this.interrupted = Thread.interrupted(); // store, then clear,
interrupt status
- LOG.trace(
- "AWS Athena start query execution wait thread was
interrupted; will return at earliest opportunity");
+ // Use Camel's task API for polling delay instead of Thread.sleep()
+ // We use initialDelay for the actual delay, and maxIterations(1) to
run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+ .withInitialDelay(Duration.ofMillis(this.currentDelay))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("AthenaQueryPollingDelay")
+ .build()
+ .run(exchange.getContext(), () -> true);
- Thread.currentThread().interrupt();
- }
this.currentDelay = this.delay;
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index fd811f305e81..81110a3ff295 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hazelcast.seda;
+import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -30,6 +31,8 @@ import org.apache.camel.Processor;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultExchangeHolder;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,11 +151,17 @@ public class HazelcastSedaConsumer extends
DefaultConsumer implements Runnable {
}
}
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
- try {
-
Thread.sleep(endpoint.getConfiguration().getOnErrorDelay());
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ // Use Camel's task API for error recovery delay instead of
Thread.sleep()
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+
.withInitialDelay(Duration.ofMillis(endpoint.getConfiguration().getOnErrorDelay()))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("HazelcastSedaErrorRecoveryDelay")
+ .build()
+ .run(getEndpoint().getCamelContext(), () -> true);
}
}
}
diff --git
a/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
b/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
index f9a97a7b27d5..dd7b35efa151 100644
---
a/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
+++
b/components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.iggy;
import java.math.BigInteger;
+import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,8 @@ import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.component.iggy.client.IggyClientConnectionPool;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.apache.iggy.client.blocking.IggyBaseClient;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.ConsumerId;
@@ -66,11 +69,17 @@ public class IggyFetchRecords implements Runnable {
while (running) {
if (iggyConsumer.isSuspending() || iggyConsumer.isSuspended()) {
LOG.trace("Consumer is suspended. Skipping message polling.");
- try {
- Thread.sleep(1000); // Sleep for a bit to avoid
busy-waiting
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ // Use Camel's task API to avoid busy-waiting instead of
Thread.sleep()
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+ .withInitialDelay(Duration.ofSeconds(1))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("IggySuspendedDelay")
+ .build()
+ .run(endpoint.getCamelContext(), () -> true);
continue;
}
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 26de3104ce4e..d62e539d3226 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -21,6 +21,7 @@ import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.CookieStore;
import java.net.URI;
+import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,8 @@ import
org.apache.camel.component.salesforce.StreamingApiConsumer;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.bayeux.client.ClientSessionChannel.MessageListener;
@@ -215,14 +218,21 @@ public class SubscriptionHelper extends ServiceSupport {
} else {
abort = false;
- try {
- LOG.debug("Pausing for {} msecs before subscribe attempt",
backoff);
- Thread.sleep(backoff);
- for (var consumer : consumers) {
- subscribe(consumer);
- }
- } catch (InterruptedException e) {
- LOG.warn("Aborting subscribe on interrupt!", e);
+ LOG.debug("Pausing for {} msecs before subscribe attempt",
backoff);
+ // Use Camel's task API for backoff delay instead of
Thread.sleep()
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+ .withInitialDelay(Duration.ofMillis(backoff))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("SalesforceSubscribeRetryDelay")
+ .build()
+ .run(component.getCamelContext(), () -> true);
+
+ for (var consumer : consumers) {
+ subscribe(consumer);
}
}
} else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
diff --git
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
index a7540c3274a7..16f81de19fff 100644
---
a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
+++
b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.zookeeper;
+import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,8 @@ import
org.apache.camel.component.zookeeper.operations.GetDataOperation;
import org.apache.camel.component.zookeeper.operations.OperationResult;
import org.apache.camel.component.zookeeper.operations.ZooKeeperOperation;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -174,11 +177,20 @@ public class ZooKeeperConsumer extends DefaultConsumer {
private void backoffAndThenRestart() {
try {
if (isRunAllowed()) {
- Thread.sleep(configuration.getBackoff());
+ // Use Camel's task API for reconnection backoff delay
instead of Thread.sleep()
+ // We use initialDelay for the actual delay, and
maxIterations(1) to run once
+ Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withMaxIterations(1)
+
.withInitialDelay(Duration.ofMillis(configuration.getBackoff()))
+ .withInterval(Duration.ZERO)
+ .build())
+ .withName("ZooKeeperReconnectBackoff")
+ .build()
+ .run(getEndpoint().getCamelContext(), () -> true);
+
initializeConsumer();
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
} catch (Exception e) {
// ignore
}