This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 57bb48bac7a617d6b6f886cb7b1b9753ddbf21f3 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Mar 29 10:21:19 2021 +0200 CAMEL-16418: Circuit breakers should ensure UoW is done --- .../faulttolerance/FaultToleranceProcessor.java | 27 ++++++++++++++-------- .../resilience4j/ResilienceProcessor.java | 8 +++---- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java index 73c9e40..e2dfc7b 100644 --- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java +++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java @@ -37,6 +37,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; @@ -45,7 +46,7 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; -import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.UnitOfWorkHelper; @@ -301,17 +302,27 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport @Override public Exchange call() throws Exception { + Exchange copy = null; + UnitOfWork uow = null; + // turn of interruption to allow fault tolerance to process the exchange under its handling exchange.adapt(ExtendedExchange.class).setInterruptable(false); try { LOG.debug("Running processor: {} with exchange: {}", processor, exchange); + // prepare a copy of exchange so downstream processors don't // cause side-effects if they mutate the exchange // in case timeout processing and continue with the fallback etc - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + // prepare uow on copy + uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); + copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + // process the processor until its fully done processor.process(copy); + + // handle the processing result if (copy.getException() != null) { exchange.setException(copy.getException()); } else { @@ -320,17 +331,13 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } - if (copy.getUnitOfWork() == null) { - // handover completions and done them manually to ensure they are being executed - List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); - UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); - } else { - // done the unit of work - copy.getUnitOfWork().done(exchange); - } } catch (Exception e) { exchange.setException(e); + } finally { + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); } + if (exchange.getException() != null) { // force exception so the circuit breaker can react throw exchange.getException(); diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 37b36f8..a541b61 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -378,13 +378,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport if (timeLimiter != null) { Supplier<CompletableFuture<Exchange>> futureSupplier; if (executorService == null) { - futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange)); + futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange)); } else { - futureSupplier = () -> CompletableFuture.supplyAsync(() -> processInCopy(exchange), executorService); + futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange), executorService); } task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); } else { - task = new CircuitBreakerTask(() -> processInCopy(exchange)); + task = new CircuitBreakerTask(() -> processTask(exchange)); } if (bulkhead != null) { @@ -410,7 +410,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport return true; } - private Exchange processInCopy(Exchange exchange) { + private Exchange processTask(Exchange exchange) { Exchange copy = null; UnitOfWork uow = null; try {
