This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 71f6737d342c901471154900754a3ed7853ca038 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Apr 8 11:42:32 2021 +0200 CAMEL-16455: Optimize CircuitBreaker EIP with task pooling --- .../resilience4j/ResilienceProcessor.java | 86 +++++++++++++++++----- 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 57f6f49..864bd0f 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -47,6 +47,10 @@ import org.apache.camel.RuntimeExchangeException; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.RouteIdAware; @@ -82,6 +86,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport private boolean shutdownExecutorService; private ExecutorService executorService; private ProcessorExchangeFactory processorExchangeFactory; + private PooledExchangeTaskFactory taskFactory; public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig, Processor processor, @@ -104,13 +109,33 @@ public class ResilienceProcessor extends AsyncProcessorSupport bulkhead = Bulkhead.of(id, bulkheadConfig); } + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + taskFactory = new PooledTaskFactory(getId()) { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new CircuitBreakerTask(); + } + }; + } + LOG.trace("Using TaskFactory: {}", taskFactory); + // create a per processor exchange factory this.processorExchangeFactory = getCamelContext().adapt(ExtendedCamelContext.class) .getProcessorExchangeFactory().newProcessorExchangeFactory(this); this.processorExchangeFactory.setRouteId(getRouteId()); this.processorExchangeFactory.setId(getId()); - ServiceHelper.buildService(processorExchangeFactory, processor); + ServiceHelper.buildService(processorExchangeFactory, taskFactory, processor); } @Override @@ -119,7 +144,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig); } - ServiceHelper.startService(processorExchangeFactory, processor); + ServiceHelper.startService(processorExchangeFactory, taskFactory, processor); } @Override @@ -128,12 +153,12 @@ public class ResilienceProcessor extends AsyncProcessorSupport getCamelContext().getExecutorServiceManager().shutdownNow(executorService); } - ServiceHelper.stopService(processorExchangeFactory, processor); + ServiceHelper.stopService(processorExchangeFactory, taskFactory, processor); } @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(processorExchangeFactory, processor); + ServiceHelper.stopAndShutdownServices(processorExchangeFactory, taskFactory, processor); } @Override @@ -419,39 +444,43 @@ public class ResilienceProcessor extends AsyncProcessorSupport // Camel error handler exchange.setProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, true); - Callable<Exchange> task; + CircuitBreakerTask task = (CircuitBreakerTask) taskFactory.acquire(exchange, callback); + Callable<Exchange> callable; if (timeLimiter != null) { Supplier<CompletableFuture<Exchange>> futureSupplier; if (executorService == null) { - futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange)); + futureSupplier = () -> CompletableFuture.supplyAsync(task); } else { - futureSupplier = () -> CompletableFuture.supplyAsync(() -> processTask(exchange), executorService); + futureSupplier = () -> CompletableFuture.supplyAsync(task, executorService); } - task = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); + callable = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier); } else { - task = new CircuitBreakerTask(() -> processTask(exchange)); + callable = task; } - if (bulkhead != null) { - task = Bulkhead.decorateCallable(bulkhead, task); + callable = Bulkhead.decorateCallable(bulkhead, task); } - task = CircuitBreaker.decorateCallable(circuitBreaker, task); + callable = CircuitBreaker.decorateCallable(circuitBreaker, callable); Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(this.id, this.fallback, exchange); try { if (LOG.isTraceEnabled()) { LOG.trace("Processing exchange: {} using circuit breaker: {}", exchange.getExchangeId(), id); } - Try.ofCallable(task).recover(fallbackTask).get(); + Try.ofCallable(callable).recover(fallbackTask).get(); } catch (Exception e) { exchange.setException(e); + } finally { + taskFactory.release(task); } + if (LOG.isTraceEnabled()) { boolean failed = exchange.isFailed(); LOG.trace("Processing exchange: {} using circuit breaker: {} complete (failed: {})", exchange.getExchangeId(), id, failed); } + callback.done(true); return true; } @@ -505,17 +534,38 @@ public class ResilienceProcessor extends AsyncProcessorSupport return exchange; } - private static final class CircuitBreakerTask implements Callable<Exchange> { + private final class CircuitBreakerTask implements PooledExchangeTask, Callable<Exchange>, Supplier<Exchange> { + + private Exchange exchange; - Supplier<Exchange> supplier; + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + // callback not in use + } + + @Override + public void reset() { + this.exchange = null; + } - public CircuitBreakerTask(Supplier<Exchange> supplier) { - this.supplier = supplier; + @Override + public void run() { + // not in use } @Override public Exchange call() throws Exception { - return supplier.get(); + // this task is either use as callable or supplier + // therefore we must call process task before returning the response + return processTask(exchange); + } + + @Override + public Exchange get() { + // this task is either use as callable or supplier + // therefore we must call process task before returning the response + return processTask(exchange); } }
