This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6ff4429185a68546aee8c76ac76a6516b9e376a6 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Mar 29 10:17:02 2021 +0200 CAMEL-16418: Circuit breakers should ensure UoW is done --- .../resilience4j/ResilienceProcessor.java | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 3dc5b4d..37b36f8 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -39,6 +39,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; @@ -48,7 +49,7 @@ import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.CircuitBreakerConstants; import org.apache.camel.spi.IdAware; -import org.apache.camel.spi.Synchronization; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.UnitOfWorkHelper; @@ -410,14 +411,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport } private Exchange processInCopy(Exchange exchange) { + Exchange copy = null; + UnitOfWork uow = null; try { LOG.debug("Running processor: {} with exchange: {}", processor, exchange); // prepare a copy of exchange so downstream processors don't // cause side-effects if they mutate the exchange // in case timeout processing and continue with the fallback etc - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + // prepare uow on copy + uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); + copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); + // process the processor until its fully done processor.process(copy); + + // handle the processing result if (copy.getException() != null) { exchange.setException(copy.getException()); } else { @@ -426,17 +435,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); } - if (copy.getUnitOfWork() == null) { - // handover completions and done them manually to ensure they are being executed - List<Synchronization> synchronizations = copy.adapt(ExtendedExchange.class).handoverCompletions(); - UnitOfWorkHelper.doneSynchronizations(copy, synchronizations, LOG); - } else { - // done the unit of work - copy.getUnitOfWork().done(exchange); - } } catch (Exception e) { exchange.setException(e); + } finally { + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); } + if (exchange.getException() != null) { // throw exception so resilient4j know it was a failure throw RuntimeExchangeException.wrapRuntimeException(exchange.getException());
