This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 593b4614cf38b7da89357e9d35851d7d53584de9
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 29 10:24:50 2021 +0200

    CAMEL-16418: Circuit breakers should ensure UoW is done
---
 .../hystrix/processor/HystrixProcessorCommand.java | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 0a1d04e..4201850 100644
--- 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -23,10 +23,13 @@ import 
com.netflix.hystrix.exception.HystrixBadRequestException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.UnitOfWorkHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,11 +115,18 @@ public class HystrixProcessorCommand extends 
HystrixCommand {
 
     @Override
     protected Message run() throws Exception {
+        Exchange copy = null;
+        UnitOfWork uow = null;
+
         LOG.debug("Running processor: {} with exchange: {}", processor, 
exchange);
 
         // prepare a copy of exchange so downstream processors don't cause 
side-effects if they mutate the exchange
         // in case Hystrix timeout processing and continue with the fallback 
etc
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, 
false);
+        copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false);
+
+        // prepare uow on copy
+        uow = 
copy.getContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory().createUnitOfWork(copy);
+        copy.adapt(ExtendedExchange.class).setUnitOfWork(uow);
         try {
             // process the processor until its fully done
             // (we do not hav any hystrix callback to leverage so we need to 
complete all work in this run method)
@@ -131,6 +141,8 @@ public class HystrixProcessorCommand extends HystrixCommand 
{
                 && getProperties().fallbackEnabled().get()
                 && isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
             LOG.debug("Exiting run command due to a hystrix execution timeout 
in processing exchange: {}", exchange);
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
             return null;
         }
 
@@ -138,6 +150,8 @@ public class HystrixProcessorCommand extends HystrixCommand 
{
         // and therefore we need this thread to not do anymore if fallback is 
already in process
         if (fallbackInUse.get()) {
             LOG.debug("Exiting run command as fallback is already in use 
processing exchange: {}", exchange);
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
             return null;
         }
 
@@ -150,6 +164,8 @@ public class HystrixProcessorCommand extends HystrixCommand 
{
             // and therefore we need this thread to not do anymore if fallback 
is already in process
             if (fallbackInUse.get()) {
                 LOG.debug("Exiting run command as fallback is already in use 
processing exchange: {}", exchange);
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
                 return null;
             }
 
@@ -164,12 +180,17 @@ public class HystrixProcessorCommand extends 
HystrixCommand {
             if (camelExchangeException instanceof HystrixBadRequestException) {
                 LOG.debug("Running processor: {} with exchange: {} done as bad 
request", processor, exchange);
                 exchange.setException(camelExchangeException);
+                // must done uow
+                UnitOfWorkHelper.doneUow(uow, copy);
                 throw camelExchangeException;
             }
 
             // copy the result before its regarded as success
             ExchangeHelper.copyResults(exchange, copy);
 
+            // must done uow
+            UnitOfWorkHelper.doneUow(uow, copy);
+
             // in case of an exception in the exchange
             // we need to trigger this by throwing the exception so hystrix 
will execute the fallback
             // or open the circuit

Reply via email to