This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit e8bbb704187ba031cf82a2e329a68b6a3bc7e64b Author: Claus Ibsen <[email protected]> AuthorDate: Thu Apr 8 12:01:35 2021 +0200 CAMEL-16455: Optimize CircuitBreaker EIP with task pooling --- .../resilience4j/ResilienceProcessor.java | 58 +++++++++++++++------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 864bd0f..05377ba 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -87,6 +87,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport private ExecutorService executorService; private ProcessorExchangeFactory processorExchangeFactory; private PooledExchangeTaskFactory taskFactory; + private PooledExchangeTaskFactory fallbackTaskFactory; public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig, Processor processor, @@ -111,14 +112,21 @@ public class ResilienceProcessor extends AsyncProcessorSupport boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); if (pooled) { + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); taskFactory = new PooledTaskFactory(getId()) { @Override public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { return new CircuitBreakerTask(); } }; - int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); taskFactory.setCapacity(capacity); + fallbackTaskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; + fallbackTaskFactory.setCapacity(capacity); } else { taskFactory = new PrototypeTaskFactory() { @Override @@ -126,8 +134,13 @@ public class ResilienceProcessor extends AsyncProcessorSupport return new CircuitBreakerTask(); } }; + fallbackTaskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerFallbackTask(); + } + }; } - LOG.trace("Using TaskFactory: {}", taskFactory); // create a per processor exchange factory this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) @@ -135,7 +148,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport this.processorExchangeFactory.setRouteId(getRouteId()); this.processorExchangeFactory.setId(getId()); - ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor); + ServiceHelper.buildService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -144,7 +157,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig); } - ServiceHelper.startService(processorExchangeFactory, taskFactory, processor); + ServiceHelper.startService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -153,12 +166,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } - ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor); + ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor); + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor); } @Override @@ -444,6 +457,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport // Camel error handler exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); + CircuitBreakerFallbackTask fallbackTask = (CircuitBreakerFallbackTask) fallbackTaskFactory.acquire(exchange, callback); CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); Callable<Exchange> callable; @@ -463,7 +477,6 @@ public class ResilienceProcessor extends AsyncProcessorSupport } callable = CircuitBreaker.decorateCallable(circuitBreaker, callable); - Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange); try { if (LOG.isTraceEnabled()) { LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id); @@ -473,6 +486,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.setException(e); } finally { taskFactory.release(task); + fallbackTaskFactory.release(fallbackTask); } if (LOG.isTraceEnabled()) { @@ -569,16 +583,24 @@ public class ResilienceProcessor extends AsyncProcessorSupport } } - private static final class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> { + private final class CircuitBreakerFallbackTask implements PooledExchangeTask, Function<Throwable, Exchange> { - private final String id; - private final Processor processor; - private final Exchange exchange; + private Exchange exchange; - private CircuitBreakerFallbackTask(String id, Processor processor, Exchange exchange) { - this.id = id; - this.processor = processor; + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } + + @Override + public void run() { + // not in use } @Override @@ -588,7 +610,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport id, throwable); } - if (processor == null) { + if (fallback == null) { if (throwable instanceof TimeoutException) { // the circuit breaker triggered a timeout (and there is no // fallback) so lets mark the exchange as failed @@ -646,10 +668,10 @@ public class ResilienceProcessor extends AsyncProcessorSupport exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); // run the fallback processor try { - LOG.debug("Running fallback: {} with exchange: {}", processor, exchange); + LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); // process the fallback until its fully done - processor.process(exchange); - LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange); + fallback.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); } catch (Throwable e) { exchange.setException(e); }
