This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-13691 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2cf5880c599de47900bcb9dc616b96a64c65dc6f Author: Claus Ibsen <[email protected]> AuthorDate: Sun Nov 17 12:24:02 2019 +0100 CAMEL-13691: camel-resilience4j - WIP --- .../resilience4j/ResilienceProcessor.java | 89 +++++++++++--- .../component/resilience4j/ResilienceReifier.java | 26 ++++- .../resilience4j/HystrixCircuitOpenTest.java | 130 --------------------- .../ResilienceTimeoutThreadPoolTest.java | 124 ++++++++++++++++++++ .../model/Resilience4jConfigurationCommon.java | 14 +++ .../model/Resilience4jConfigurationDefinition.java | 9 ++ .../main/Resilience4jConfigurationProperties.java | 22 ++++ .../camel-main-configuration-metadata.json | 8 +- 8 files changed, 272 insertions(+), 150 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index b4ab2ae..1b1a039 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -20,6 +20,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; @@ -32,6 +34,8 @@ import io.github.resilience4j.timelimiter.TimeLimiter; import io.github.resilience4j.timelimiter.TimeLimiterConfig; import io.vavr.control.Try; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; @@ -42,6 +46,7 @@ import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,17 +54,20 @@ import org.slf4j.LoggerFactory; * Implementation of Circuit Breaker EIP using resilience4j. */ @ManagedResource(description = "Managed Resilience Processor") -public class ResilienceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, org.apache.camel.Traceable, IdAware { +public class ResilienceProcessor extends AsyncProcessorSupport implements CamelContextAware, Navigate<Processor>, org.apache.camel.Traceable, IdAware { private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class); private volatile CircuitBreaker circuitBreaker; + private CamelContext camelContext; private String id; - private CircuitBreakerConfig circuitBreakerConfig; - private BulkheadConfig bulkheadConfig; - private TimeLimiterConfig timeLimiterConfig; + private final CircuitBreakerConfig circuitBreakerConfig; + private final BulkheadConfig bulkheadConfig; + private final TimeLimiterConfig timeLimiterConfig; private final Processor processor; private final Processor fallback; + private boolean shutdownExecutorService; + private ExecutorService executorService; public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, TimeLimiterConfig timeLimiterConfig, Processor processor, Processor fallback) { @@ -71,6 +79,16 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga } @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override public String getId() { return id; } @@ -80,6 +98,22 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga this.id = id; } + public boolean isShutdownExecutorService() { + return shutdownExecutorService; + } + + public void setShutdownExecutorService(boolean shutdownExecutorService) { + this.shutdownExecutorService = shutdownExecutorService; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + @Override public String getTraceLabel() { return "resilience4j"; @@ -314,18 +348,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga Bulkhead bh = Bulkhead.of(id, bulkheadConfig); task = Bulkhead.decorateCallable(bh, task); } - // timeout handling is more complex with thread-pools - // TODO: Allow to plugin custom thread-pool instead of JDKs + if (timeLimiterConfig != null) { - final Callable<Exchange> future = task; - Supplier<CompletableFuture<Exchange>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> { - try { - return future.call(); - } catch (Exception e) { - exchange.setException(e); - } - return exchange; - }); + // timeout handling is more complex with thread-pools + final CircuitBreakerTimeoutTask timeoutTask = new CircuitBreakerTimeoutTask(task, exchange); + Supplier<CompletableFuture<Exchange>> futureSupplier; + if (executorService == null) { + futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get); + } else { + futureSupplier = () -> CompletableFuture.supplyAsync(timeoutTask::get, executorService); + } + TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig); task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier); } @@ -339,12 +372,15 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga @Override protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext", this); circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig); } @Override protected void doStop() throws Exception { - // noop + if (shutdownExecutorService && executorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + } } private static class CircuitBreakerTask implements Callable<Exchange> { @@ -441,4 +477,25 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga } } + private static class CircuitBreakerTimeoutTask implements Supplier<Exchange> { + + private final Callable<Exchange> future; + private final Exchange exchange; + + private CircuitBreakerTimeoutTask(Callable<Exchange> future, Exchange exchange) { + this.future = future; + this.exchange = exchange; + } + + @Override + public Exchange get() { + try { + return future.call(); + } catch (Exception e) { + exchange.setException(e); + } + return exchange; + } + } + } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index da157fd..e9aa90c 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; @@ -30,6 +31,7 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.model.CircuitBreakerDefinition; import org.apache.camel.model.Model; +import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.Resilience4jConfigurationCommon; import org.apache.camel.model.Resilience4jConfigurationDefinition; import org.apache.camel.reifier.ProcessorReifier; @@ -43,8 +45,6 @@ import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { - // TODO: thread pool bulkhead - // TODO: Configure timeout thread-pool globally // TODO: spring-boot allow to configure via resilience4j-spring-boot // TODO: example // TODO: camel-main - configure hystrix/resilience/rest via java code fluent builder (does it work) @@ -70,7 +70,9 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition BulkheadConfig bhConfig = configureBulkHead(config); TimeLimiterConfig tlConfig = configureTimeLimiter(config); - return new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback); + ResilienceProcessor answer = new ResilienceProcessor(cbConfig, bhConfig, tlConfig, processor, fallback); + configureTimeoutExecutorService(answer, routeContext, config); + return answer; } private CircuitBreakerConfig configureCircuitBreaker(Resilience4jConfigurationCommon config) { @@ -138,6 +140,24 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition return builder.build(); } + private void configureTimeoutExecutorService(ResilienceProcessor processor, RouteContext routeContext, Resilience4jConfigurationCommon config) { + if (config.getTimeoutEnabled() == null || !config.getTimeoutEnabled()) { + return; + } + + if (config.getTimeoutExecutorServiceRef() != null) { + String ref = config.getTimeoutExecutorServiceRef(); + boolean shutdownThreadPool = false; + ExecutorService executorService = routeContext.lookup(ref, ExecutorService.class); + if (executorService == null) { + executorService = ProcessorDefinitionHelper.lookupExecutorServiceRef(routeContext, "CircuitBreaker", definition, ref); + shutdownThreadPool = true; + } + processor.setExecutorService(executorService); + processor.setShutdownExecutorService(shutdownThreadPool); + } + } + // ******************************* // Helpers // ******************************* diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java deleted file mode 100644 index e9e7bcf..0000000 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixCircuitOpenTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.resilience4j; - -import java.io.IOException; - -import org.apache.camel.CamelExecutionException; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.RoutesBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED; -import static org.apache.camel.component.resilience4j.CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION; - -@Ignore -public class HystrixCircuitOpenTest extends CamelTestSupport { - public static final Integer REQUEST_VOLUME_THRESHOLD = 4; - private static final Logger LOG = LoggerFactory.getLogger(HystrixCircuitOpenTest.class); - - private HystrixExceptionRoute route = new HystrixExceptionRoute(); - - @Test - public void testCircuitOpen() throws Exception { - LOG.info("testCircuitOpen start"); - // failing requests - route.throwException = true; - for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) { - try { - template.asyncRequestBody("direct:start", "Request Body"); - } catch (CamelExecutionException e) { - LOG.info(e.toString()); - } - } - Thread.sleep(1500); - - resetMocks(); - - // notice this can be flaky due timing when using thread sleeps in unit tests - getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, true); - - route.throwException = false; - try { - template.requestBody("direct:start", "Request Body"); - LOG.info("Instead circuit open expected"); - } catch (CamelExecutionException e) { - LOG.info("Circuit open expected ", e); - } - - assertMockEndpointsSatisfied(); - - // wait for the circuit to try an other request - Thread.sleep(500); - for (int i = 0; i < 2 * REQUEST_VOLUME_THRESHOLD; i++) { - try { - template.requestBody("direct:start", "Request Body"); - LOG.info("Circuit has closed"); - } catch (CamelExecutionException e) { - Thread.sleep(i * 100); - LOG.info("Circuit will be closed soon " + e.toString()); - } - } - - resetMocks(); - - getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SHORT_CIRCUITED, false); - getMockEndpoint("mock:result").expectedPropertyReceived(RESPONSE_SUCCESSFUL_EXECUTION, true); - - template.requestBody("direct:start", "Request Body"); - - assertMockEndpointsSatisfied(); - } - - @Override - protected RoutesBuilder createRouteBuilder() throws Exception { - return route; - } - - class HystrixExceptionRoute extends RouteBuilder { - volatile boolean throwException = true; - - @Override - public void configure() throws Exception { - from("direct:start") - .circuitBreaker() - .hystrixConfiguration() - .executionTimeoutInMilliseconds(100) - .circuitBreakerRequestVolumeThreshold(REQUEST_VOLUME_THRESHOLD) - .metricsRollingStatisticalWindowInMilliseconds(1000) - .circuitBreakerSleepWindowInMilliseconds(2000) - .end() - .log("Hystrix processing start: ${threadName}") - .process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - if (throwException) { - LOG.info("Will throw exception"); - throw new IOException("Route has failed"); - } else { - LOG.info("Will NOT throw exception"); - } - } - }) - .log("Hystrix processing end: ${threadName}") - .end() - .log(RESPONSE_SHORT_CIRCUITED + " = ${exchangeProperty." + RESPONSE_SHORT_CIRCUITED + "}") - .to("mock:result"); - } - } -} - diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java new file mode 100644 index 0000000..58e0c33 --- /dev/null +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceTimeoutThreadPoolTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.resilience4j; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeoutException; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Resilience using timeout and custom thread pool with Java DSL + */ +public class ResilienceTimeoutThreadPoolTest extends CamelTestSupport { + + @BindToRegistry + public ExecutorService myThreadPool() { + return context().getExecutorServiceManager().newFixedThreadPool(this, "myThreadPool", 2); + } + + @Test + public void testFast() throws Exception { + // this calls the fast route and therefore we get a response + Object out = template.requestBody("direct:start", "fast"); + assertEquals("Fast response", out); + + + ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class); + assertNotNull(pte); + assertEquals(2, pte.getCorePoolSize()); + assertEquals(1, pte.getCompletedTaskCount()); + + assertFalse(pte.isShutdown()); + } + + @Test + public void testSlow() throws Exception { + // this calls the slow route and therefore causes a timeout which triggers an exception + try { + template.requestBody("direct:start", "slow"); + fail("Should fail due to timeout"); + } catch (Exception e) { + // expected a timeout + assertIsInstanceOf(TimeoutException.class, e.getCause()); + } + + ThreadPoolExecutor pte = context().getRegistry().lookupByNameAndType("myThreadPool", ThreadPoolExecutor.class); + assertNotNull(pte); + assertEquals(2, pte.getCorePoolSize()); + assertEquals(0, pte.getCompletedTaskCount()); + assertEquals(1, pte.getActiveCount()); + + // stop camel and thread pool is also stopped + context().stop(); + + assertTrue(pte.isShutdown()); + } + + @Test + public void testSlowLoop() throws Exception { + // this calls the slow route and therefore causes a timeout which triggers an exception + for (int i = 0; i < 10; i++) { + try { + log.info(">>> test run " + i + " <<<"); + template.requestBody("direct:start", "slow"); + fail("Should fail due to timeout"); + } catch (Exception e) { + // expected a timeout + assertIsInstanceOf(TimeoutException.class, e.getCause()); + } + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .circuitBreaker() + // enable and use 2 second timeout + .resilience4jConfiguration().timeoutEnabled(true).timeoutDuration(2000).timeoutExecutorServiceRef("myThreadPool").end() + .log("Resilience processing start: ${threadName}") + .toD("direct:${body}") + .log("Resilience processing end: ${threadName}") + .end() + .log("After Resilience ${body}"); + + from("direct:fast") + // this is a fast route and takes 1 second to respond + .log("Fast processing start: ${threadName}") + .delay(1000) + .transform().constant("Fast response") + .log("Fast processing end: ${threadName}"); + + from("direct:slow") + // this is a slow route and takes 3 second to respond + .log("Slow processing start: ${threadName}") + .delay(3000) + .transform().constant("Slow response") + .log("Slow processing end: ${threadName}"); + } + }; + } + +} diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java index 208f8b0..decdd34 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java @@ -16,6 +16,7 @@ */ package org.apache.camel.model; +import java.util.concurrent.ForkJoinPool; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -66,6 +67,8 @@ public class Resilience4jConfigurationCommon extends IdentifiedType { private Integer bulkheadMaxWaitDuration; @Metadata(label = "timeout", defaultValue = "false") private Boolean timeoutEnabled; + @Metadata(label = "timeout") + private String timeoutExecutorServiceRef; @Metadata(label = "timeout", defaultValue = "1000") private Integer timeoutDuration; @Metadata(label = "timeout", defaultValue = "true") @@ -279,6 +282,17 @@ public class Resilience4jConfigurationCommon extends IdentifiedType { this.timeoutEnabled = timeoutEnabled; } + public String getTimeoutExecutorServiceRef() { + return timeoutExecutorServiceRef; + } + + /** + * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default) + */ + public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) { + this.timeoutExecutorServiceRef = timeoutExecutorServiceRef; + } + public Integer getTimeoutDuration() { return timeoutDuration; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java index 76eadb0..54d4f0d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java @@ -16,6 +16,7 @@ */ package org.apache.camel.model; +import java.util.concurrent.ForkJoinPool; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; @@ -205,6 +206,14 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati } /** + * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default) + */ + public Resilience4jConfigurationDefinition timeoutExecutorServiceRef(String executorServiceRef) { + setTimeoutExecutorServiceRef(executorServiceRef); + return this; + } + + /** * Configures the thread execution timeout (millis). * Default value is 1000 millis (1 second). */ diff --git a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java index 81c1ca9..9292287 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/Resilience4jConfigurationProperties.java @@ -16,6 +16,8 @@ */ package org.apache.camel.main; +import java.util.concurrent.ForkJoinPool; + /** * Global configuration for Resilience EIP circuit breaker. */ @@ -38,6 +40,7 @@ public class Resilience4jConfigurationProperties { private Integer bulkheadMaxConcurrentCalls; private Integer bulkheadMaxWaitDuration; private Boolean timeoutEnabled; + private String timeoutExecutorServiceRef; private Integer timeoutDuration; private Boolean timeoutCancelRunningFuture; @@ -256,6 +259,17 @@ public class Resilience4jConfigurationProperties { this.timeoutEnabled = timeoutEnabled; } + public String getTimeoutExecutorServiceRef() { + return timeoutExecutorServiceRef; + } + + /** + * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default) + */ + public void setTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) { + this.timeoutExecutorServiceRef = timeoutExecutorServiceRef; + } + public Integer getTimeoutDuration() { return timeoutDuration; } @@ -437,6 +451,14 @@ public class Resilience4jConfigurationProperties { } /** + * References to a custom thread pool to use when timeout is enabled (uses {@link ForkJoinPool#commonPool()} by default) + */ + public Resilience4jConfigurationProperties withTimeoutExecutorServiceRef(String timeoutExecutorServiceRef) { + this.timeoutExecutorServiceRef = timeoutExecutorServiceRef; + return this; + } + + /** * Configures the thread execution timeout (millis). * Default value is 1000 millis (1 second). */ diff --git a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json index 6aac40d..983efdb 100644 --- a/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/main/resources/META-INF/camel-main-configuration-metadata.json @@ -703,7 +703,7 @@ "name":"camel.resilience4j.timeout-duration", "type":"java.lang.Integer", "sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties", - "description":"Configures the thread execution timeout. Default value is 1 second." + "description":"Configures the thread execution timeout (millis). Default value is 1000 millis (1 second)." }, { "name":"camel.resilience4j.timeout-enabled", @@ -712,6 +712,12 @@ "description":"Whether timeout is enabled or not on the circuit breaker. Default is false." }, { + "name":"camel.resilience4j.timeout-executor-service-ref", + "type":"java.lang.String", + "sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties", + "description":"References to a custom thread pool to use when timeout is enabled (uses ForkJoinPool#commonPool() by default)" + }, + { "name":"camel.resilience4j.wait-duration-in-open-state", "type":"java.lang.Integer", "sourceType":"org.apache.camel.main.Resilience4jConfigurationProperties",
