This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f6a37d7 CAMEL-17121: converted camel-jms reply manager to repeatable
tasks
f6a37d7 is described below
commit f6a37d71228e8f3d8f099899a17927236cdff719
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Nov 9 15:45:40 2021 +0100
CAMEL-17121: converted camel-jms reply manager to repeatable tasks
---
.../component/jms/reply/ReplyManagerSupport.java | 40 +++++++++-------------
1 file changed, 16 insertions(+), 24 deletions(-)
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 6204398..40d73d0 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.jms.reply;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +37,9 @@ import org.apache.camel.component.jms.JmsMessageHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.ForegroundTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -230,33 +234,21 @@ public abstract class ReplyManagerSupport extends
ServiceSupport implements Repl
log.warn("Early reply received with correlationID [{}] -> {}",
correlationID, message);
}
- ReplyHandler answer = null;
-
// wait up until configured values
- boolean done = false;
- int counter = 0;
- while (!done && counter++ <
endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter())
{
- log.trace("Early reply not found handler at attempt {}. Waiting a
bit longer.", counter);
- try {
-
Thread.sleep(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime());
- } catch (InterruptedException e) {
- // ignore
- }
-
- // try again
- answer = correlation.get(correlationID);
- done = answer != null;
+ long interval =
endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime();
+ ForegroundTask task =
Tasks.foregroundTask().withBudget(Budgets.iterationBudget()
+
.withMaxIterations(endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter())
+ .withInterval(Duration.ofMillis(interval))
+ .build())
+ .build();
+
+ return task.run(() -> getReplyHandler(correlationID), answer -> answer
!= null).orElse(null);
+ }
- if (answer != null) {
- if (log.isTraceEnabled()) {
- log.trace(
- "Early reply with correlationID [{}] has been
matched after {} attempts and can be processed using handler: {}",
- correlationID, counter, answer);
- }
- }
- }
+ private ReplyHandler getReplyHandler(String correlationID) {
+ log.trace("Early reply not found handler. Waiting a bit longer.");
- return answer;
+ return correlation.get(correlationID);
}
@Override