This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 593b4614cf38b7da89357e9d35851d7d53584de9 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Mar 29 10:24:50 2021 +0200 CAMEL-16418: Circuit breakers should ensure UoW is done --- .../hystrix/processor/HystrixProcessorCommand.java | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java index 0a1d04e..4201850 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java @@ -23,10 +23,13 @@ import com.netflix.hystrix.exception.HystrixBadRequestException; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.UnitOfWorkHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,11 +115,18 @@ public class HystrixProcessorCommand extends HystrixCommand { @Override protected Message run() throws Exception { + Exchange copy = null; + UnitOfWork uow = null; + LOG.debug("Running processor: {} with exchange: {}", processor, exchange); // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange // in case Hystrix timeout processing and continue with the fallback etc - Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); + + // prepare uow on copy + uow = copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy); + copy.adapt(ExtendedExchange.class).setUnitOfWork(uow); try { // process the processor until its fully done // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) @@ -131,6 +141,8 @@ public class HystrixProcessorCommand extends HystrixCommand { && getProperties().fallbackEnabled().get() && isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { LOG.debug("Exiting run command due to a hystrix execution timeout in processing exchange: {}", exchange); + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); return null; } @@ -138,6 +150,8 @@ public class HystrixProcessorCommand extends HystrixCommand { // and therefore we need this thread to not do anymore if fallback is already in process if (fallbackInUse.get()) { LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange); + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); return null; } @@ -150,6 +164,8 @@ public class HystrixProcessorCommand extends HystrixCommand { // and therefore we need this thread to not do anymore if fallback is already in process if (fallbackInUse.get()) { LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange); + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); return null; } @@ -164,12 +180,17 @@ public class HystrixProcessorCommand extends HystrixCommand { if (camelExchangeException instanceof HystrixBadRequestException) { LOG.debug("Running processor: {} with exchange: {} done as bad request", processor, exchange); exchange.setException(camelExchangeException); + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); throw camelExchangeException; } // copy the result before its regarded as success ExchangeHelper.copyResults(exchange, copy); + // must done uow + UnitOfWorkHelper.doneUow(uow, copy); + // in case of an exception in the exchange // we need to trigger this by throwing the exception so hystrix will execute the fallback // or open the circuit
