This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new fe14d16 CAMEL-17121: converted camel-splunk to repeatable tasks where appropriate fe14d16 is described below commit fe14d164ac23f99c3c5da815a56c1fae7fd6a178 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Nov 9 11:06:31 2021 +0100 CAMEL-17121: converted camel-splunk to repeatable tasks where appropriate --- .../component/splunk/support/SplunkDataReader.java | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java index 9ecebab..1e4c679 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java @@ -19,10 +19,12 @@ package org.apache.camel.component.splunk.support; import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; import java.util.List; +import java.util.function.BooleanSupplier; import com.splunk.Job; import com.splunk.JobArgs; @@ -40,6 +42,9 @@ import com.splunk.ServiceArgs; import org.apache.camel.component.splunk.ConsumerType; import org.apache.camel.component.splunk.SplunkEndpoint; import org.apache.camel.component.splunk.event.SplunkEvent; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; +import org.apache.camel.support.task.budget.IterationBoundedBudget; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -222,15 +227,23 @@ public class SplunkDataReader { } else { throw new RuntimeException("Unable to find saved search '" + getSavedSearch() + "'."); } - while (!job.isDone()) { - Thread.sleep(2000); - } + + waitForJob(2000, job::isDone); + List<SplunkEvent> data = extractData(job, false, callback); this.lastSuccessfulReadTime = startTime; return data; } + private void waitForJob(long interval, BooleanSupplier supplier) { + Tasks.foregroundTask().withBudget(Budgets.iterationBudget() + .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS) + .withInterval(Duration.ofMillis(interval)) + .build()) + .build().run(supplier); + } + private List<SplunkEvent> nonBlockingSearch(SplunkResultProcessor callback) throws Exception { LOG.debug("non block search start"); @@ -263,18 +276,15 @@ public class SplunkDataReader { Job job = service.getJobs().create(getSearch(), queryArgs); LOG.debug("Running search : {} with queryArgs : {}", getSearch(), queryArgs); if (realtime) { - while (!job.isReady()) { - Thread.sleep(500); - } + waitForJob(500, job::isReady); // Besides job.isReady there must be some delay before real time job // is ready // TODO seems that the realtime stream is not quite isReady to be // read Thread.sleep(4000); } else { - while (!job.isDone()) { - Thread.sleep(500); - } + waitForJob(500, job::isDone); + } return extractData(job, realtime, callback); }