This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 57bb48bac7a617d6b6f886cb7b1b9753ddbf21f3
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 29 10:21:19 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../faulttolerance/FaultToleranceProcessor.java    | 27 ++++++++++++++--------
 .../resilience4j/ResilienceProcessor.java          |  8 +++----
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 73c9e40..e2dfc7b 100644
--- 
a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ 
b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -37,6 +37,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -45,7 +46,7 @@ import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.CircuitBreakerConstants;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.UnitOfWorkHelper;
@@ -301,17 +302,27 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
 
         @Override
         public Exchange call() throws Exception {
+            Exchange copy = null;
+            UnitOfWork uow = null;
+
             // turn of interruption to allow fault tolerance to process the 
exchange under its handling
             exchange.adapt(ExtendedExchange.class).setInterruptable(false);
 
             try {
                 LOG.debug("Running processor: {} with exchange: {}", 
processor, exchange);
+
                 // prepare a copy of exchange so downstream processors don't
                 // cause side-effects if they mutate the exchange
                 // in case timeout processing and continue with the fallback 
etc
-                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, 
false, false);
+                copy = ExchangeHelper.createCorrelatedCopy(exchange, false, 
false);
+                // prepare uow on copy
+                uow = 
copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+                copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
+
                 // process the processor until its fully done
                 processor.process(copy);
+
+                // handle the processing result
                 if (copy.getException() != null) {
                     exchange.setException(copy.getException());
                 } else {
@@ -320,17 +331,13 @@ public class FaultToleranceProcessor extends 
AsyncProcessorSupport
                     
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, 
true);
                     
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
                 }
-                if (copy.getUnitOfWork() == null) {
-                    // handover completions and done them manually to ensure 
they are being executed
-                    List<Synchronization> synchronizations = 
copy.adapt(ExtendedExchange.class).handoverCompletions();
-                    UnitOfWorkHelper.doneSynchronizations(copy, 
synchronizations, LOG);
-                } else {
-                    // done the unit of work
-                    copy.getUnitOfWork().done(exchange);
-                }
             } catch (Exception e) {
                 exchange.setException(e);
+            } finally {
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
             }
+
             if (exchange.getException() != null) {
                 // force exception so the circuit breaker can react
                 throw exchange.getException();
diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 37b36f8..a541b61 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -378,13 +378,13 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
         if (timeLimiter != null) {
             Supplier<CompletableFuture<Exchange>> futureSupplier;
             if (executorService == null) {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> 
processInCopy(exchange));
+                futureSupplier = () -> CompletableFuture.supplyAsync(() -> 
processTask(exchange));
             } else {
-                futureSupplier = () -> CompletableFuture.supplyAsync(() -> 
processInCopy(exchange), executorService);
+                futureSupplier = () -> CompletableFuture.supplyAsync(() -> 
processTask(exchange), executorService);
             }
             task = TimeLimiter.decorateFutureSupplier(timeLimiter, 
futureSupplier);
         } else {
-            task = new CircuitBreakerTask(() -> processInCopy(exchange));
+            task = new CircuitBreakerTask(() -> processTask(exchange));
         }
 
         if (bulkhead != null) {
@@ -410,7 +410,7 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport
         return true;
     }
 
-    private Exchange processInCopy(Exchange exchange) {
+    private Exchange processTask(Exchange exchange) {
         Exchange copy = null;
         UnitOfWork uow = null;
         try {

Reply via email to