This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a6865a21a26 CAMEL-20684: Improve MicroProfile Fault Tolerance timer setup a6865a21a26 is described below commit a6865a21a26359a478f0e7d88a4f13b3ab3a43a6 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Wed Apr 17 11:26:24 2024 +0100 CAMEL-20684: Improve MicroProfile Fault Tolerance timer setup --- .../faulttolerance/FaultToleranceProcessor.java | 31 ++++----- .../faulttolerance/FaultToleranceReifier.java | 22 +++++++ .../faulttolerance/FaultToleranceTimerService.java | 74 ++++++++++++++++++++++ 3 files changed, 108 insertions(+), 19 deletions(-) diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index 7f6e6003374..765e9faa827 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -29,7 +29,7 @@ import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; import io.smallrye.faulttolerance.core.fallback.Fallback; import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch; import io.smallrye.faulttolerance.core.timeout.Timeout; -import io.smallrye.faulttolerance.core.timer.ThreadTimer; +import io.smallrye.faulttolerance.core.timer.Timer; import io.smallrye.faulttolerance.core.util.ExceptionDecision; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -83,12 +83,10 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport private boolean shutdownScheduledExecutorService; private ExecutorService executorService; private boolean shutdownExecutorService; - private ExecutorService threadTimerExecutorService; - private boolean shutdownThreadTimerExecutorService; private ProcessorExchangeFactory processorExchangeFactory; private PooledExchangeTaskFactory taskFactory; private PooledExchangeTaskFactory fallbackTaskFactory; - private ThreadTimer timer; + private Timer timer; public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor, Processor fallbackProcessor) { @@ -151,6 +149,14 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport this.executorService = executorService; } + public Timer getTimer() { + return timer; + } + + public void setTimer(Timer timer) { + this.timer = timer; + } + @Override public String getTraceLabel() { return "faultTolerance"; @@ -262,7 +268,7 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport } // 2. timeout if (config.isTimeoutEnabled()) { - target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), timer); + target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), getTimer()); } // 3. fallback if (fallbackProcessor != null) { @@ -349,15 +355,10 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport protected void doInit() throws Exception { ObjectHelper.notNull(camelContext, "CamelContext", this); if (circuitBreaker == null) { - threadTimerExecutorService - = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer"); - shutdownThreadTimerExecutorService = true; - - timer = ThreadTimer.create(threadTimerExecutorService); circuitBreaker = new CircuitBreaker<>( invocation(), id, ExceptionDecision.ALWAYS_FAILURE, config.getDelay(), config.getRequestVolumeThreshold(), config.getFailureRatio(), - config.getSuccessThreshold(), SystemStopwatch.INSTANCE, timer); + config.getSuccessThreshold(), SystemStopwatch.INSTANCE, getTimer()); } ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); @@ -389,14 +390,6 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport getCamelContext().getExecutorServiceManager().shutdownNow(executorService); executorService = null; } - if (timer != null) { - timer.shutdown(); - timer = null; - } - if (shutdownThreadTimerExecutorService && threadTimerExecutorService != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService); - threadTimerExecutorService = null; - } ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java index f966ad288da..d5b19e986d0 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java @@ -21,7 +21,9 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import io.smallrye.faulttolerance.ExecutorHolder; import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker; +import io.smallrye.faulttolerance.core.timer.Timer; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.CircuitBreakerDefinition; @@ -69,6 +71,7 @@ public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefini answer.setCircuitBreaker(cb); } configureBulkheadExecutorService(answer, config); + configureTimer(answer); return answer; } @@ -126,6 +129,25 @@ public class FaultToleranceReifier extends ProcessorReifier<CircuitBreakerDefini } } + private void configureTimer(FaultToleranceProcessor answer) throws Exception { + Timer timer; + + // If running in a CDI container, try to find the singleton scoped ExecutorHolder. Else we have to manage the Timer ourselves + ExecutorHolder executorHolder = findSingleByType(ExecutorHolder.class); + if (executorHolder != null) { + timer = executorHolder.getTimer(); + } else { + FaultToleranceTimerService threadTimerService = camelContext.hasService(FaultToleranceTimerService.class); + if (threadTimerService == null) { + threadTimerService = new FaultToleranceTimerService(); + camelContext.addService(threadTimerService); + } + timer = threadTimerService.getTimer(); + } + + answer.setTimer(timer); + } + // ******************************* // Helpers // ******************************* diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java new file mode 100644 index 00000000000..973aa1cf66b --- /dev/null +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.microprofile.faulttolerance; + +import java.util.concurrent.ExecutorService; + +import io.smallrye.faulttolerance.core.timer.ThreadTimer; +import io.smallrye.faulttolerance.core.timer.Timer; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.support.service.ServiceSupport; + +/** + * Service to manage the lifecycle of the SmallRye Fault Tolerance Timer. Primarily used when running without CDI + * container support. + */ +public class FaultToleranceTimerService extends ServiceSupport implements CamelContextAware { + private ExecutorService threadTimerExecutorService; + private Timer timer; + private CamelContext camelContext; + + @Override + protected void doInit() throws Exception { + threadTimerExecutorService + = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer"); + timer = ThreadTimer.create(threadTimerExecutorService); + } + + @Override + protected void doStop() throws Exception { + if (timer != null) { + try { + timer.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + timer = null; + } + } + + if (threadTimerExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService); + threadTimerExecutorService = null; + } + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return this.camelContext; + } + + public Timer getTimer() { + return timer; + } +}