This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-13691 in repository https://gitbox.apache.org/repos/asf/camel.git
commit d8cec60d7ba8ff777917910d43c46126336cb6f2 Author: Claus Ibsen <[email protected]> AuthorDate: Sun Nov 17 08:50:22 2019 +0100 CAMEL-13691: camel-resilience4j - WIP --- bom/camel-bom/pom.xml | 10 +++ .../resilience4j/ResilienceProcessor.java | 78 +++++++++++++++++--- .../component/resilience4j/ResilienceReifier.java | 31 ++++++-- .../resilience4j/ResilienceManagementTest.java | 83 ++++++++++++++++++++++ ...va => ResilienceRouteBulkheadFallbackTest.java} | 10 ++- ...est.java => ResilienceRouteBulkheadOkTest.java} | 23 +++--- ... => ResilienceRouteFallbackViaNetworkTest.java} | 26 ++++--- .../model/Resilience4jConfigurationCommon.java | 78 +++++++++++++++----- .../model/Resilience4jConfigurationDefinition.java | 37 ++++++++-- docs/user-manual/modules/ROOT/nav.adoc | 2 + .../modules/ROOT/pages/hystrix-eip.adoc | 14 ++-- .../{hystrix-eip.adoc => resilience4j-eip.adoc} | 54 +++++++------- .../ROOT/pages/resilience4jConfiguration-eip.adoc | 7 ++ .../camel-spring-boot-dependencies/pom.xml | 10 +++ 14 files changed, 361 insertions(+), 102 deletions(-) diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index a1a49bc..9a34d63 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -2365,6 +2365,16 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest</artifactId> <version>${project.version}</version> </dependency> diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 91fa38c..cbc57ec 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.function.Function; import java.util.function.Supplier; +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.vavr.control.Try; @@ -29,6 +31,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorSupport; @@ -45,16 +48,17 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class); private String id; - private CircuitBreakerConfig config; + private CircuitBreakerConfig circuitBreakerConfig; + private BulkheadConfig bulkheadConfig; private final Processor processor; private final Processor fallback; - private final boolean fallbackViaNetwork; - public ResilienceProcessor(CircuitBreakerConfig config, Processor processor, Processor fallback, boolean fallbackViaNetwork) { - this.config = config; + public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig, BulkheadConfig bulkheadConfig, + Processor processor, Processor fallback) { + this.circuitBreakerConfig = circuitBreakerConfig; + this.bulkheadConfig = bulkheadConfig; this.processor = processor; this.fallback = fallback; - this.fallbackViaNetwork = fallbackViaNetwork; } @Override @@ -72,6 +76,56 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga return "resilience4j"; } + @ManagedAttribute + public float getCircuitBreakerFailureRateThreshold() { + return circuitBreakerConfig.getFailureRateThreshold(); + } + + @ManagedAttribute + public float getCircuitBreakerSlowCallRateThreshold() { + return circuitBreakerConfig.getSlowCallRateThreshold(); + } + + @ManagedAttribute + public int getCircuitBreakerMinimumNumberOfCalls() { + return circuitBreakerConfig.getMinimumNumberOfCalls(); + } + + @ManagedAttribute + public int getCircuitBreakerPermittedNumberOfCallsInHalfOpenState() { + return circuitBreakerConfig.getPermittedNumberOfCallsInHalfOpenState(); + } + + @ManagedAttribute + public int getCircuitBreakerSlidingWindowSize() { + return circuitBreakerConfig.getSlidingWindowSize(); + } + + @ManagedAttribute + public String getCircuitBreakerSlidingWindowType() { + return circuitBreakerConfig.getSlidingWindowType().name(); + } + + @ManagedAttribute + public long getCircuitBreakerWaitDurationInOpenState() { + return circuitBreakerConfig.getWaitDurationInOpenState().getSeconds(); + } + + @ManagedAttribute + public boolean isCircuitBreakerTransitionFromOpenToHalfOpenEnabled() { + return circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled(); + } + + @ManagedAttribute + public boolean isCircuitBreakerWritableStackTraceEnabled() { + return circuitBreakerConfig.isWritableStackTraceEnabled(); + } + + @ManagedAttribute + public boolean isBulkheadEnabled() { + return bulkheadConfig != null; + } + @Override public List<Processor> next() { if (!hasNext()) { @@ -99,19 +153,23 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga // Callable<String> callable = TimeLimiter.decorateFutureSupplier(TimeLimiter.of(Duration.ofMillis(500)), futureSupplier); // String result = CircuitBreaker.decorateCheckedSupplier(cb, callable::call).apply(); -// Bulkhead bh = Bulkhead.ofDefaults("ddd"); -// BulkheadConfig. - // TimeLimiter time = TimeLimiter.of(Duration.ofSeconds(1)); // Supplier<Future<Exchange>> task2 = time.decorateFutureSupplier(() -> { // task.get(); // Future // }); - CircuitBreaker cb = CircuitBreaker.of(id, config); + CircuitBreaker cb = CircuitBreaker.of(id, circuitBreakerConfig); + Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange)); + Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(fallback, exchange); + if (bulkheadConfig != null) { + Bulkhead bh = Bulkhead.of(id, bulkheadConfig); + task = Bulkhead.decorateSupplier(bh, task); + } + Try.ofSupplier(task) - .recover(new CircuitBreakerFallbackTask(fallback, exchange)) + .recover(fallbackTask) .andFinally(() -> callback.done(false)).get(); return false; diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 3704219..06399cd 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import io.github.resilience4j.bulkhead.BulkheadConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import org.apache.camel.CamelContext; @@ -41,8 +42,9 @@ import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { + // TODO: metrics with state of CB + // TODO: expose metrics as JMX on processor // TODO: Timeout - // TODO: Bulkhead for viaNetwork public ResilienceReifier(CircuitBreakerDefinition definition) { super(definition); @@ -56,14 +58,18 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition if (definition.getOnFallback() != null) { fallback = ProcessorReifier.reifier(definition.getOnFallback()).createProcessor(routeContext); } - + boolean fallbackViaNetwork = definition.getOnFallback() != null && definition.getOnFallback().isFallbackViaNetwork(); + if (fallbackViaNetwork) { + throw new UnsupportedOperationException("camel-resilience4j does not support onFallbackViaNetwork"); + } final Resilience4jConfigurationCommon config = buildResilience4jConfiguration(routeContext.getCamelContext()); - CircuitBreakerConfig cfg = configureResilience4j(config); + CircuitBreakerConfig cbConfig = configureCircuitBreaker(config); + BulkheadConfig bhConfig = configureBulkHead(config); - return new ResilienceProcessor(cfg, processor, fallback, false); + return new ResilienceProcessor(cbConfig, bhConfig, processor, fallback); } - private CircuitBreakerConfig configureResilience4j(Resilience4jConfigurationCommon config) { + private CircuitBreakerConfig configureCircuitBreaker(Resilience4jConfigurationCommon config) { CircuitBreakerConfig.Builder builder = CircuitBreakerConfig.custom(); if (config.getAutomaticTransitionFromOpenToHalfOpenEnabled() != null) { builder.automaticTransitionFromOpenToHalfOpenEnabled(config.getAutomaticTransitionFromOpenToHalfOpenEnabled()); @@ -98,6 +104,21 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition return builder.build(); } + private BulkheadConfig configureBulkHead(Resilience4jConfigurationCommon config) { + if (config.getBulkheadEnabled() == null || !config.getBulkheadEnabled()) { + return null; + } + + BulkheadConfig.Builder builder = BulkheadConfig.custom(); + if (config.getBulkheadMaxConcurrentCalls() != null) { + builder.maxConcurrentCalls(config.getBulkheadMaxConcurrentCalls()); + } + if (config.getBulkheadMaxWaitDuration() != null) { + builder.maxWaitDuration(Duration.ofSeconds(config.getBulkheadMaxConcurrentCalls())); + } + return builder.build(); + } + // ******************************* // Helpers // ******************************* diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java new file mode 100644 index 0000000..4b1046f --- /dev/null +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.resilience4j; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class ResilienceManagementTest extends CamelTestSupport { + + @Override + protected boolean useJmx() { + return true; + } + + protected MBeanServer getMBeanServer() { + return context.getManagementStrategy().getManagementAgent().getMBeanServer(); + } + + @Test + public void testResilience() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // look inside jmx + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // context name + String name = context.getManagementName(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=" + name + ",type=processors,name=\"myResilience\""); + + // should be on start + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("start", routeId); + + // should be id of the node + Integer num = (Integer) mbeanServer.getAttribute(on, "CircuitBreakerMinimumNumberOfCalls"); + assertEquals("100", num.toString()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").routeId("start") + .circuitBreaker().id("myResilience") + .to("direct:foo") + .onFallback() + .transform().constant("Fallback message") + .end() + .to("mock:result"); + + from("direct:foo") + .transform().constant("Bye World"); + } + }; + } + +} diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java similarity index 88% copy from components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java copy to components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java index 7f2ea21..69833b9 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadFallbackTest.java @@ -18,14 +18,12 @@ package org.apache.camel.component.resilience4j; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Ignore; import org.junit.Test; -@Ignore -public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { +public class ResilienceRouteBulkheadFallbackTest extends CamelTestSupport { @Test - public void testHystrix() throws Exception { + public void testResilience() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); @@ -42,9 +40,9 @@ public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { public void configure() throws Exception { from("direct:start") .to("log:start") - .circuitBreaker() + .circuitBreaker().resilience4jConfiguration().bulkheadEnabled(true).end() .throwException(new IllegalArgumentException("Forced")) - .onFallbackViaNetwork() + .onFallback() .transform().constant("Fallback message") .end() .to("log:result") diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java similarity index 72% copy from components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java copy to components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java index 7f2ea21..3368884 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteBulkheadOkTest.java @@ -18,17 +18,15 @@ package org.apache.camel.component.resilience4j; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Ignore; import org.junit.Test; -@Ignore -public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { +public class ResilienceRouteBulkheadOkTest extends CamelTestSupport { @Test - public void testHystrix() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); - getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); - getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); + public void testResilience() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, true); + getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false); template.sendBody("direct:start", "Hello World"); @@ -41,14 +39,17 @@ public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { @Override public void configure() throws Exception { from("direct:start") - .to("log:start") - .circuitBreaker() - .throwException(new IllegalArgumentException("Forced")) - .onFallbackViaNetwork() + .circuitBreaker().resilience4jConfiguration().bulkheadEnabled(true).end() + .to("direct:foo") + .to("log:foo") + .onFallback() .transform().constant("Fallback message") .end() .to("log:result") .to("mock:result"); + + from("direct:foo") + .transform().constant("Bye World"); } }; } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackViaNetworkTest.java similarity index 73% rename from components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java rename to components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackViaNetworkTest.java index 7f2ea21..012c219 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/HystrixRouteFallbackViaNetworkTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRouteFallbackViaNetworkTest.java @@ -18,21 +18,25 @@ package org.apache.camel.component.resilience4j; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Ignore; import org.junit.Test; -@Ignore -public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { +public class ResilienceRouteFallbackViaNetworkTest extends CamelTestSupport { - @Test - public void testHystrix() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); - getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); - getMockEndpoint("mock:result").expectedPropertyReceived(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - - template.sendBody("direct:start", "Hello World"); + @Override + public boolean isUseRouteBuilder() { + return false; + } - assertMockEndpointsSatisfied(); + @Test + public void testResilience() throws Exception { + try { + context.addRoutes(createRouteBuilder()); + context.start(); + fail("Should throw exception"); + } catch (Exception e) { + assertIsInstanceOf(UnsupportedOperationException.class, e.getCause()); + assertEquals("camel-resilience4j does not support onFallbackViaNetwork", e.getCause().getMessage()); + } } @Override diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java index 3c5be79..771211d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationCommon.java @@ -26,51 +26,58 @@ import org.apache.camel.spi.Metadata; public class Resilience4jConfigurationCommon extends IdentifiedType { @XmlAttribute - @Metadata(defaultValue = "Camel") - private String groupKey; + @Metadata(label = "circuitbreaker") + private String configRef; @XmlAttribute - @Metadata(defaultValue = "50") + @Metadata(label = "circuitbreaker", defaultValue = "50") private Float failureRateThreshold; @XmlAttribute - @Metadata(defaultValue = "10") + @Metadata(label = "circuitbreaker", defaultValue = "10") private Integer permittedNumberOfCallsInHalfOpenState; @XmlAttribute - @Metadata(defaultValue = "100") + @Metadata(label = "circuitbreaker", defaultValue = "100") private Integer slidingWindowSize; @XmlAttribute - @Metadata(defaultValue = "COUNT_BASED", enums = "TIME_BASED,COUNT_BASED") + @Metadata(label = "circuitbreaker", defaultValue = "COUNT_BASED", enums = "TIME_BASED,COUNT_BASED") private String slidingWindowType; @XmlAttribute - @Metadata(defaultValue = "100") + @Metadata(label = "circuitbreaker", defaultValue = "100") private Integer minimumNumberOfCalls; @XmlAttribute - @Metadata(defaultValue = "true") + @Metadata(label = "circuitbreaker", defaultValue = "true") private Boolean writableStackTraceEnabled; @XmlAttribute - @Metadata(defaultValue = "60") + @Metadata(label = "circuitbreaker", defaultValue = "60") private Integer waitDurationInOpenState; @XmlAttribute - @Metadata(defaultValue = "false") + @Metadata(label = "circuitbreaker", defaultValue = "false") private Boolean automaticTransitionFromOpenToHalfOpenEnabled; @XmlAttribute - @Metadata(defaultValue = "100") + @Metadata(label = "circuitbreaker", defaultValue = "100") private Float slowCallRateThreshold; @XmlAttribute - @Metadata(defaultValue = "60") + @Metadata(label = "circuitbreaker", defaultValue = "60") private Integer slowCallDurationThreshold; + @Metadata(label = "bulkhead", defaultValue = "false") + private Boolean bulkheadEnabled; + @Metadata(label = "bulkhead", defaultValue = "25") + private Integer bulkheadMaxConcurrentCalls; + @Metadata(label = "bulkhead", defaultValue = "0") + private Integer bulkheadMaxWaitDuration; // Getter/Setter // ------------------------------------------------------------------------- - public String getGroupKey() { - return groupKey; + public String getConfigRef() { + return configRef; } /** - * Sets the group key to use. The default value is Camel. + * Refers to an existing io.github.resilience4j.circuitbreaker.CircuitBreakerConfig instance + * to lookup and use from the registry. */ - public void setGroupKey(String groupKey) { - this.groupKey = groupKey; + public void setConfigRef(String configRef) { + this.configRef = configRef; } public Float getFailureRateThreshold() { @@ -215,4 +222,41 @@ public class Resilience4jConfigurationCommon extends IdentifiedType { public void setSlowCallDurationThreshold(Integer slowCallDurationThreshold) { this.slowCallDurationThreshold = slowCallDurationThreshold; } + + public Boolean getBulkheadEnabled() { + return bulkheadEnabled; + } + + /** + * Whether bulkhead is enabled or not on the circuit breaker. + */ + public void setBulkheadEnabled(Boolean bulkheadEnabled) { + this.bulkheadEnabled = bulkheadEnabled; + } + + public Integer getBulkheadMaxConcurrentCalls() { + return bulkheadMaxConcurrentCalls; + } + + /** + * Configures the max amount of concurrent calls the bulkhead will support. + */ + public void setBulkheadMaxConcurrentCalls(Integer bulkheadMaxConcurrentCalls) { + this.bulkheadMaxConcurrentCalls = bulkheadMaxConcurrentCalls; + } + + public Integer getBulkheadMaxWaitDuration() { + return bulkheadMaxWaitDuration; + } + + /** + * Configures a maximum amount of time which the calling thread will wait to enter the bulkhead. If bulkhead has space available, entry + * is guaranteed and immediate. If bulkhead is full, calling threads will contest for space, if it becomes available. maxWaitDuration can be set to 0. + * <p> + * Note: for threads running on an event-loop or equivalent (rx computation pool, etc), setting maxWaitDuration to 0 is highly recommended. Blocking + * an event-loop thread will most likely have a negative effect on application throughput. + */ + public void setBulkheadMaxWaitDuration(Integer bulkheadMaxWaitDuration) { + this.bulkheadMaxWaitDuration = bulkheadMaxWaitDuration; + } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java index 2275140..015feeb 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java @@ -31,8 +31,6 @@ import org.apache.camel.spi.Metadata; @XmlAccessorType(XmlAccessType.FIELD) public class Resilience4jConfigurationDefinition extends Resilience4jConfigurationCommon { - public static final String DEFAULT_GROUP_KEY = "Camel"; - @XmlTransient private CircuitBreakerDefinition parent; @@ -47,10 +45,11 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati // ------------------------------------------------------------------------- /** - * Sets the group key to use. The default value is Camel. + * Refers to an existing io.github.resilience4j.circuitbreaker.CircuitBreakerConfig instance + * to lookup and use from the registry. */ - public Resilience4jConfigurationDefinition groupKey(String groupKey) { - setGroupKey(groupKey); + public Resilience4jConfigurationDefinition configRef(String ref) { + setConfigRef(ref); return this; } @@ -168,6 +167,34 @@ public class Resilience4jConfigurationDefinition extends Resilience4jConfigurati } /** + * Whether bulkhead is enabled or not on the circuit breaker. + */ + public Resilience4jConfigurationDefinition bulkheadEnabled(Boolean bulkheadEnabled) { + setBulkheadEnabled(bulkheadEnabled); + return this; + } + + /** + * Configures the max amount of concurrent calls the bulkhead will support. + */ + public Resilience4jConfigurationDefinition bulkheadMaxConcurrentCalls(Integer bulkheadMaxConcurrentCalls) { + setBulkheadMaxWaitDuration(bulkheadMaxConcurrentCalls); + return this; + } + + /** + * Configures a maximum amount of time which the calling thread will wait to enter the bulkhead. If bulkhead has space available, entry + * is guaranteed and immediate. If bulkhead is full, calling threads will contest for space, if it becomes available. maxWaitDuration can be set to 0. + * <p> + * Note: for threads running on an event-loop or equivalent (rx computation pool, etc), setting maxWaitDuration to 0 is highly recommended. Blocking + * an event-loop thread will most likely have a negative effect on application throughput. + */ + public Resilience4jConfigurationDefinition bulkheadMaxWaitDuration(Integer bulkheadMaxWaitDuration) { + setBulkheadMaxWaitDuration(bulkheadMaxWaitDuration); + return this; + } + + /** * End of configuration. */ public CircuitBreakerDefinition end() { diff --git a/docs/user-manual/modules/ROOT/nav.adoc b/docs/user-manual/modules/ROOT/nav.adoc index 6d5904d..719f335 100644 --- a/docs/user-manual/modules/ROOT/nav.adoc +++ b/docs/user-manual/modules/ROOT/nav.adoc @@ -102,6 +102,8 @@ ** xref:removeProperty-eip.adoc[Remove Property EIP] ** xref:requestReply-eip.adoc[Request Reply] ** xref:resequence-eip.adoc[Resequence EIP] + ** xref:resilience4j-eip.adoc[Resilience4j EIP] + ** xref:resilience4jConfiguration-eip.adoc[Resilience4j Configuration EIP] ** xref:rollback-eip.adoc[Rollback EIP] ** xref:roundRobin-eip.adoc[Round Robin EIP] ** xref:routingSlip-eip.adoc[Routing Slip EIP] diff --git a/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc b/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc index 2cd1eb5..bd98db1 100644 --- a/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc +++ b/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc @@ -114,11 +114,11 @@ You can find an example with the source code: https://github.com/apache/camel/tr See the xref:hystrix-component.adoc[Hystrix Component]. -== Camel's Error Handler and Hystrix EIP +== Camel's Error Handler and Circuit Breaker EIP -By default the Hystrix EIP handles errors by itself. This means if the circuit breaker is open and +By default the Circuit Breaker EIP handles errors by itself. This means if the circuit breaker is open and the message fails, then Camel's error handler is not reacting also. -However, you can enable Camels error handler with Hystrix by enabling the `inheritErrorHandler` option, as shown: +However, you can enable Camels error handler with circuit breaker by enabling the `inheritErrorHandler` option, as shown: [source,java] ---- @@ -127,7 +127,7 @@ errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDel from("direct:start") .to("log:start") - // turn on Camel's error handler on hystrix so it can do redeliveries + // turn on Camel's error handler on circuit breaker so Camel can do redeliveries .circuitBreaker().inheritErrorHandler(true) .to("mock:a") .throwException(new IllegalArgumentException("Forced")) @@ -136,13 +136,13 @@ from("direct:start") .to("mock:result"); ---- -This example is from an unit test, where you can see the Hystrix EIP block has been hardcoded +This example is from an unit test, where you can see the Circuit Breaker EIP block has been hardcoded to always fail by throwing an exception. Because the `inheritErrorHandler` has been enabled, -then Camel's error handler will attempt to call the Hystrix EIP block again. +then Camel's error handler will attempt to call the Circuit Breaker EIP block again. That means the `mock:a` endpoint will receive the message again, and a total of 1 + 3 = 4 message (first time + 3 redeliveries). -If we turn off the `inheritErrorHandler` option (default) then the Hystrix EIP will only be +If we turn off the `inheritErrorHandler` option (default) then the Circuit Breaker EIP will only be executed once because it handled the error itself. diff --git a/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc b/docs/user-manual/modules/ROOT/pages/resilience4j-eip.adoc similarity index 56% copy from docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc copy to docs/user-manual/modules/ROOT/pages/resilience4j-eip.adoc index 2cd1eb5..46a6a8c 100644 --- a/docs/user-manual/modules/ROOT/pages/hystrix-eip.adoc +++ b/docs/user-manual/modules/ROOT/pages/resilience4j-eip.adoc @@ -1,13 +1,13 @@ -[[hystrix-eip]] -= Hystrix EIP -:page-source: core/camel-core-engine/src/main/docs/eips/hystrix-eip.adoc +[[resilience4j-eip]] += Resilience4j EIP +:page-source: core/camel-core-engine/src/main/docs/eips/resilience4j-eip.adoc -*Available as of Camel 2.18* +*Available as of Camel 3.0* -The Hystrix EIP provides integration with Netflix https://github.com/Netflix/Hystrix[Hystrix] to be used as circuit breaker in the Camel routes. Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable. +This component supports the Circuit Breaker EIP with the Resilience4j library. NOTE: Camel provides the Circuit Breaker EIP in the route model, which allows to plugin different implementations. -Hystrix is one such implementation. +Resilience4j is one such implementation. Maven users will need to add the following dependency to their pom.xml to use this EIP: @@ -15,7 +15,7 @@ Maven users will need to add the following dependency to their pom.xml to use th ---- <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix</artifactId> + <artifactId>camel-resilience4j</artifactId> <version>x.x.x</version><!-- use the same version as your Camel core version --> </dependency> ---- @@ -23,26 +23,18 @@ Maven users will need to add the following dependency to their pom.xml to use th == Configuration options // eip options: START -The Hystrix EIP supports 2 options which are listed below: - -[width="100%",cols="2,5,^1,2",options="header"] -|=== -| Name | Description | Default | Type -| *hystrixConfiguration* | Configures the Hystrix EIP Use end when configuration is complete, to return back to the Hystrix EIP. | | HystrixConfiguration Definition -| *hystrixConfigurationRef* | Refers to a Hystrix configuration to use for configuring the Hystrix EIP. | | String -|=== // eip options: END -See xref:hystrixConfiguration-eip.adoc[Hystrix Configuration] for all the configuration options on Hystrix EIP. +See xref:resilience4jConfiguration-eip.adoc[Resilience4j Configuration] for all the configuration options on Resilience Circuit Breaker. == Samples -Below is an example route showing an Hystrix endpoint that protects against slow operation by falling back to the in-lined fallback route. By default the timeout request is just *1000ms* so the HTTP endpoint has to be fairly quick to succeed. +Below is an example route showing a Resilience endpoint that protects against a downstream HTTP operation by falling back to the in-lined fallback route. [source,java] ---- from("direct:start") .circuitBreaker() - .to("http://fooservice.com/slow") + .to("http://fooservice.com/faulty") .onFallback() .transform().constant("Fallback message") .end() @@ -56,7 +48,7 @@ And in XML DSL: <route> <from uri="direct:start"/> <circuitBreaker> - <to uri="http://fooservice.com/slow"/> + <to uri="http://fooservice.com/faulty"/> <onFallback> <transform> <constant>Fallback message</constant> @@ -68,9 +60,11 @@ And in XML DSL: </camelContext> ---- -== Configuring Hystrix +== Configuring Resilienc4j -You can fine-tune Hystrix by the many xref:hystrixConfiguration-eip.adoc[Hystrix Configuration] options. +You can fine-tune Resilience4j by the many xref:resilience4jConfiguration-eip.adoc[Resilience4j Configuration] options. + +TODO: Update example!!! For example to use a 2 second execution timeout, you can do as follows: [source,java] @@ -110,15 +104,15 @@ See xref:onFallback-eip.adoc[onFallback]. You can find an example with the source code: https://github.com/apache/camel/tree/master/examples/camel-example-hystrix[camel-example-hystrix]. -== Using Hystrix with Spring Boot +== Using Resilience4j with Spring Boot -See the xref:hystrix-component.adoc[Hystrix Component]. +See the xref:components::resilience4j-component.adoc[Resilience4j Component]. -== Camel's Error Handler and Hystrix EIP +== Camel's Error Handler and Circuit Breaker EIP -By default the Hystrix EIP handles errors by itself. This means if the circuit breaker is open and +By default the Circuit Breaker EIP handles errors by itself. This means if the circuit breaker is open and the message fails, then Camel's error handler is not reacting also. -However, you can enable Camels error handler with Hystrix by enabling the `inheritErrorHandler` option, as shown: +However, you can enable Camels error handler with circuit breaker by enabling the `inheritErrorHandler` option, as shown: [source,java] ---- @@ -127,7 +121,7 @@ errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDel from("direct:start") .to("log:start") - // turn on Camel's error handler on hystrix so it can do redeliveries + // turn on Camel's error handler on circuit breaker so Camel can do redeliveries .circuitBreaker().inheritErrorHandler(true) .to("mock:a") .throwException(new IllegalArgumentException("Forced")) @@ -136,13 +130,13 @@ from("direct:start") .to("mock:result"); ---- -This example is from an unit test, where you can see the Hystrix EIP block has been hardcoded +This example is from an unit test, where you can see the Circuit Breaker EIP block has been hardcoded to always fail by throwing an exception. Because the `inheritErrorHandler` has been enabled, -then Camel's error handler will attempt to call the Hystrix EIP block again. +then Camel's error handler will attempt to call the Circuit Breaker EIP block again. That means the `mock:a` endpoint will receive the message again, and a total of 1 + 3 = 4 message (first time + 3 redeliveries). -If we turn off the `inheritErrorHandler` option (default) then the Hystrix EIP will only be +If we turn off the `inheritErrorHandler` option (default) then the Circuit Breaker EIP will only be executed once because it handled the error itself. diff --git a/docs/user-manual/modules/ROOT/pages/resilience4jConfiguration-eip.adoc b/docs/user-manual/modules/ROOT/pages/resilience4jConfiguration-eip.adoc new file mode 100644 index 0000000..997ee11 --- /dev/null +++ b/docs/user-manual/modules/ROOT/pages/resilience4jConfiguration-eip.adoc @@ -0,0 +1,7 @@ +[[resilience4jConfiguration-eip]] += Resilience4j Configuration EIP +:page-source: core/camel-core-engine/src/main/docs/eips/resilience4jConfiguration-eip.adoc + + +// eip options: START +// eip options: END diff --git a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml index 17a15bf..2d82d3f 100644 --- a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml +++ b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml @@ -2585,6 +2585,16 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest</artifactId> <version>${project.version}</version> </dependency>
