This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a6865a21a26 CAMEL-20684: Improve MicroProfile Fault Tolerance timer 
setup
a6865a21a26 is described below

commit a6865a21a26359a478f0e7d88a4f13b3ab3a43a6
Author: James Netherton <jamesnether...@gmail.com>
AuthorDate: Wed Apr 17 11:26:24 2024 +0100

    CAMEL-20684: Improve MicroProfile Fault Tolerance timer setup
---
 .../faulttolerance/FaultToleranceProcessor.java    | 31 ++++-----
 .../faulttolerance/FaultToleranceReifier.java      | 22 +++++++
 .../faulttolerance/FaultToleranceTimerService.java | 74 ++++++++++++++++++++++
 3 files changed, 108 insertions(+), 19 deletions(-)

diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 7f6e6003374..765e9faa827 100644
--- 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -29,7 +29,7 @@ import 
io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
 import io.smallrye.faulttolerance.core.fallback.Fallback;
 import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
 import io.smallrye.faulttolerance.core.timeout.Timeout;
-import io.smallrye.faulttolerance.core.timer.ThreadTimer;
+import io.smallrye.faulttolerance.core.timer.Timer;
 import io.smallrye.faulttolerance.core.util.ExceptionDecision;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -83,12 +83,10 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
     private boolean shutdownScheduledExecutorService;
     private ExecutorService executorService;
     private boolean shutdownExecutorService;
-    private ExecutorService threadTimerExecutorService;
-    private boolean shutdownThreadTimerExecutorService;
     private ProcessorExchangeFactory processorExchangeFactory;
     private PooledExchangeTaskFactory taskFactory;
     private PooledExchangeTaskFactory fallbackTaskFactory;
-    private ThreadTimer timer;
+    private Timer timer;
 
     public FaultToleranceProcessor(FaultToleranceConfiguration config, 
Processor processor,
                                    Processor fallbackProcessor) {
@@ -151,6 +149,14 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
         this.executorService = executorService;
     }
 
+    public Timer getTimer() {
+        return timer;
+    }
+
+    public void setTimer(Timer timer) {
+        this.timer = timer;
+    }
+
     @Override
     public String getTraceLabel() {
         return "faultTolerance";
@@ -262,7 +268,7 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             }
             // 2. timeout
             if (config.isTimeoutEnabled()) {
-                target = new Timeout<>(target, "timeout", 
config.getTimeoutDuration(), timer);
+                target = new Timeout<>(target, "timeout", 
config.getTimeoutDuration(), getTimer());
             }
             // 3. fallback
             if (fallbackProcessor != null) {
@@ -349,15 +355,10 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
     protected void doInit() throws Exception {
         ObjectHelper.notNull(camelContext, "CamelContext", this);
         if (circuitBreaker == null) {
-            threadTimerExecutorService
-                    = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"CircuitBreakerThreadTimer");
-            shutdownThreadTimerExecutorService = true;
-
-            timer = ThreadTimer.create(threadTimerExecutorService);
             circuitBreaker = new CircuitBreaker<>(
                     invocation(), id, ExceptionDecision.ALWAYS_FAILURE, 
config.getDelay(), config.getRequestVolumeThreshold(),
                     config.getFailureRatio(),
-                    config.getSuccessThreshold(), SystemStopwatch.INSTANCE, 
timer);
+                    config.getSuccessThreshold(), SystemStopwatch.INSTANCE, 
getTimer());
         }
 
         ServiceHelper.initService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
@@ -389,14 +390,6 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
             
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
             executorService = null;
         }
-        if (timer != null) {
-            timer.shutdown();
-            timer = null;
-        }
-        if (shutdownThreadTimerExecutorService && threadTimerExecutorService 
!= null) {
-            
getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
-            threadTimerExecutorService = null;
-        }
 
         ServiceHelper.stopService(processorExchangeFactory, taskFactory, 
fallbackTaskFactory, processor);
     }
diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
index f966ad288da..d5b19e986d0 100644
--- 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
@@ -21,7 +21,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 
+import io.smallrye.faulttolerance.ExecutorHolder;
 import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import io.smallrye.faulttolerance.core.timer.Timer;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.CircuitBreakerDefinition;
@@ -69,6 +71,7 @@ public class FaultToleranceReifier extends 
ProcessorReifier<CircuitBreakerDefini
             answer.setCircuitBreaker(cb);
         }
         configureBulkheadExecutorService(answer, config);
+        configureTimer(answer);
         return answer;
     }
 
@@ -126,6 +129,25 @@ public class FaultToleranceReifier extends 
ProcessorReifier<CircuitBreakerDefini
         }
     }
 
+    private void configureTimer(FaultToleranceProcessor answer) throws 
Exception {
+        Timer timer;
+
+        // If running in a CDI container, try to find the singleton scoped 
ExecutorHolder. Else we have to manage the Timer ourselves
+        ExecutorHolder executorHolder = findSingleByType(ExecutorHolder.class);
+        if (executorHolder != null) {
+            timer = executorHolder.getTimer();
+        } else {
+            FaultToleranceTimerService threadTimerService = 
camelContext.hasService(FaultToleranceTimerService.class);
+            if (threadTimerService == null) {
+                threadTimerService = new FaultToleranceTimerService();
+                camelContext.addService(threadTimerService);
+            }
+            timer = threadTimerService.getTimer();
+        }
+
+        answer.setTimer(timer);
+    }
+
     // *******************************
     // Helpers
     // *******************************
diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java
new file mode 100644
index 00000000000..973aa1cf66b
--- /dev/null
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.concurrent.ExecutorService;
+
+import io.smallrye.faulttolerance.core.timer.ThreadTimer;
+import io.smallrye.faulttolerance.core.timer.Timer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Service to manage the lifecycle of the SmallRye Fault Tolerance Timer. 
Primarily used when running without CDI
+ * container support.
+ */
+public class FaultToleranceTimerService extends ServiceSupport implements 
CamelContextAware {
+    private ExecutorService threadTimerExecutorService;
+    private Timer timer;
+    private CamelContext camelContext;
+
+    @Override
+    protected void doInit() throws Exception {
+        threadTimerExecutorService
+                = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, 
"CircuitBreakerThreadTimer");
+        timer = ThreadTimer.create(threadTimerExecutorService);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (timer != null) {
+            try {
+                timer.shutdown();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } finally {
+                timer = null;
+            }
+        }
+
+        if (threadTimerExecutorService != null) {
+            
getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
+            threadTimerExecutorService = null;
+        }
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return this.camelContext;
+    }
+
+    public Timer getTimer() {
+        return timer;
+    }
+}

Reply via email to