This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-13691 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3693d53fe526f5bcd194ca528e1000aed6d997ad Author: Claus Ibsen <[email protected]> AuthorDate: Sat Nov 16 11:18:36 2019 +0100 CAMEL-13691: camel-resilience4j - WIP --- apache-camel/pom.xml | 10 ++ apache-camel/src/main/descriptors/common-bin.xml | 2 + components/camel-resilience4j/pom.xml | 10 ++ .../src/main/docs/resilience4j.adoc | 12 +- .../resilience4j/ResilienceProcessor.java | 132 ++++++++++++++------- .../component/resilience4j/ResilienceReifier.java | 4 + docs/components/modules/ROOT/nav.adoc | 1 + docs/components/modules/ROOT/pages/index.adoc | 4 +- .../modules/ROOT/pages}/resilience4j.adoc | 13 +- parent/pom.xml | 10 ++ 10 files changed, 140 insertions(+), 58 deletions(-) diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml index d691527..cd3c1b0 100644 --- a/apache-camel/pom.xml +++ b/apache-camel/pom.xml @@ -1228,6 +1228,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest</artifactId> <version>${project.version}</version> </dependency> @@ -2782,6 +2787,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest-starter</artifactId> <version>${project.version}</version> </dependency> diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index fe4a531..213ce4f 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -266,6 +266,7 @@ <include>org.apache.camel:camel-reactive-streams</include> <include>org.apache.camel:camel-reactor</include> <include>org.apache.camel:camel-ref</include> + <include>org.apache.camel:camel-resilience4j</include> <include>org.apache.camel:camel-rest</include> <include>org.apache.camel:camel-rest-swagger</include> <include>org.apache.camel:camel-ribbon</include> @@ -616,6 +617,7 @@ <include>org.apache.camel:camel-reactive-streams-starter</include> <include>org.apache.camel:camel-reactor-starter</include> <include>org.apache.camel:camel-ref-starter</include> + <include>org.apache.camel:camel-resilience4j-starter</include> <include>org.apache.camel:camel-rest-starter</include> <include>org.apache.camel:camel-rest-swagger-starter</include> <include>org.apache.camel:camel-ribbon-starter</include> diff --git a/components/camel-resilience4j/pom.xml b/components/camel-resilience4j/pom.xml index fa396b4..71f185f 100644 --- a/components/camel-resilience4j/pom.xml +++ b/components/camel-resilience4j/pom.xml @@ -50,6 +50,16 @@ <artifactId>resilience4j-circuitbreaker</artifactId> <version>1.1.0</version> </dependency> + <dependency> + <groupId>io.github.resilience4j</groupId> + <artifactId>resilience4j-bulkhead</artifactId> + <version>1.1.0</version> + </dependency> + <dependency> + <groupId>io.github.resilience4j</groupId> + <artifactId>resilience4j-timelimiter</artifactId> + <version>1.1.0</version> + </dependency> <!-- for testing --> <dependency> diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/components/camel-resilience4j/src/main/docs/resilience4j.adoc index 9afc96e..0bf1b56 100644 --- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc +++ b/components/camel-resilience4j/src/main/docs/resilience4j.adoc @@ -1,10 +1,10 @@ -= Hystrix Component += Resilience4j Component -*Since Camel 2.18* +*Since Camel 3.0* -The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes. +This component supports the Circuit Breaker EIP with the Resilience4j library. -For more details see the Hystrix EIP documentation. +For more details see the Circuit Breaker EIP documentation. Maven users will need to add the following dependency to their `pom.xml` for this component: @@ -13,7 +13,7 @@ for this component: ---- <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix</artifactId> + <artifactId>camel-resilience4j</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> @@ -28,7 +28,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s ---- <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix-starter</artifactId> + <artifactId>camel-resilience4j-starter</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index a9bd0e7..17be13f 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -18,7 +18,8 @@ package org.apache.camel.component.resilience4j; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; +import java.util.function.Function; +import java.util.function.Supplier; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.vavr.control.Try; @@ -26,6 +27,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Navigate; import org.apache.camel.Processor; +import org.apache.camel.RuntimeExchangeException; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorSupport; @@ -90,9 +92,50 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga // run this as if we run inside try .. catch so there is no regular Camel error handler exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); +// Supplier<CompletableFuture<String>> futureSupplier = () -> CompletableFuture.supplyAsync(() -> "Hello"); +// Callable<String> callable = TimeLimiter.decorateFutureSupplier(TimeLimiter.of(Duration.ofMillis(500)), futureSupplier); +// String result = CircuitBreaker.decorateCheckedSupplier(cb, callable::call).apply(); + +// Bulkhead bh = Bulkhead.ofDefaults("ddd"); +// BulkheadConfig. + +// TimeLimiter time = TimeLimiter.of(Duration.ofSeconds(1)); +// Supplier<Future<Exchange>> task2 = time.decorateFutureSupplier(() -> { +// task.get(); +// Future +// }); + CircuitBreaker cb = CircuitBreaker.ofDefaults(id); + Supplier<Exchange> task = CircuitBreaker.decorateSupplier(cb, new CircuitBreakerTask(processor, exchange)); + Try.ofSupplier(task) + .recover(new CircuitBreakerFallbackTask(fallback, exchange)) + .andFinally(() -> callback.done(false)).get(); + + return false; + } + + @Override + protected void doStart() throws Exception { + // noop + } + + @Override + protected void doStop() throws Exception { + // noop + } + + private static class CircuitBreakerTask implements Supplier<Exchange> { + + private final Processor processor; + private final Exchange exchange; + + private CircuitBreakerTask(Processor processor, Exchange exchange) { + this.processor = processor; + this.exchange = exchange; + } - Callable task = CircuitBreaker.decorateCallable(cb, () -> { + @Override + public Exchange get() { try { LOG.debug("Running processor: {} with exchange: {}", processor, exchange); // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange @@ -113,53 +156,52 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga } if (exchange.getException() != null) { // throw exception so resilient4j know it was a failure - throw exchange.getException(); + throw RuntimeExchangeException.wrapRuntimeException(exchange.getException()); } - return null; - }); - - Try.ofCallable(task) - .recover(f -> { - if (fallback != null) { - // store the last to endpoint as the failure endpoint - if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { - exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); - } - // give the rest of the pipeline another chance - exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException()); - exchange.removeProperty(Exchange.ROUTE_STOP); - exchange.setException(null); - // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - // run the fallback processor - try { - LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); - // process the fallback until its fully done - fallback.process(exchange); - LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); - } catch (Exception e) { - exchange.setException(e); - } - - exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); - exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); - } - return null; - }) - .andFinally(() -> callback.done(false)).get(); - - return false; + return exchange; + } } - @Override - protected void doStart() throws Exception { - // noop - } + private static class CircuitBreakerFallbackTask implements Function<Throwable, Exchange> { - @Override - protected void doStop() throws Exception { - // noop + private final Processor processor; + private final Exchange exchange; + + private CircuitBreakerFallbackTask(Processor processor, Exchange exchange) { + this.processor = processor; + this.exchange = exchange; + } + + @Override + public Exchange apply(Throwable throwable) { + if (processor != null) { + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { + exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exchange.getException()); + exchange.removeProperty(Exchange.ROUTE_STOP); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. catch block + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + // run the fallback processor + try { + LOG.debug("Running fallback: {} with exchange: {}", processor, exchange); + // process the fallback until its fully done + processor.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", processor, exchange); + } catch (Exception e) { + exchange.setException(e); + } + + exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION, false); + exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, true); + } + + return exchange; + } } } diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 1a07f57..4bd1278 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -23,6 +23,10 @@ import org.apache.camel.spi.RouteContext; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { + // TODO: Resilience configuration in camel-core / model + // TODO: Timeout + // TODO: Bulkhead for viaNetwork + public ResilienceReifier(CircuitBreakerDefinition definition) { super(definition); } diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc index 4473bc1..d53b85c 100644 --- a/docs/components/modules/ROOT/nav.adoc +++ b/docs/components/modules/ROOT/nav.adoc @@ -288,6 +288,7 @@ * xref:reactive-streams-component.adoc[Reactive Streams Component] * xref:reactor.adoc[Reactor Component] * xref:ref-component.adoc[Ref Component] +* xref:resilience4j.adoc[Resilience4j Component] * xref:rest-swagger-component.adoc[REST Swagger Component] * xref:rest-api-component.adoc[REST API Component] * xref:rest-component.adoc[REST Component] diff --git a/docs/components/modules/ROOT/pages/index.adoc b/docs/components/modules/ROOT/pages/index.adoc index e80b4f3..0c7a425 100644 --- a/docs/components/modules/ROOT/pages/index.adoc +++ b/docs/components/modules/ROOT/pages/index.adoc @@ -772,7 +772,7 @@ Number of Languages: 17 in 11 JAR artifacts (0 deprecated) == Miscellaneous Components // others: START -Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated) +Number of Miscellaneous Components: 39 in 39 JAR artifacts (0 deprecated) [width="100%",cols="4,1,5",options="header"] |=== @@ -808,6 +808,8 @@ Number of Miscellaneous Components: 38 in 38 JAR artifacts (0 deprecated) | xref:reactor.adoc[Reactor] (camel-reactor) | 2.20 | Reactor based back-end for Camel's reactive streams component +| xref:resilience4j.adoc[Resilience4j] (camel-resilience4j) | 3.0 | Circuit Breaker EIP using Resilience4j + | xref:ribbon.adoc[Ribbon] (camel-ribbon) | 2.18 | Using Netflix Ribbon for client side load balancing | xref:rxjava.adoc[RxJava] (camel-rxjava) | 2.22 | RxJava based back-end for Camel's reactive streams component diff --git a/components/camel-resilience4j/src/main/docs/resilience4j.adoc b/docs/components/modules/ROOT/pages/resilience4j.adoc similarity index 77% copy from components/camel-resilience4j/src/main/docs/resilience4j.adoc copy to docs/components/modules/ROOT/pages/resilience4j.adoc index 9afc96e..6801015 100644 --- a/components/camel-resilience4j/src/main/docs/resilience4j.adoc +++ b/docs/components/modules/ROOT/pages/resilience4j.adoc @@ -1,10 +1,11 @@ -= Hystrix Component += Resilience4j Component +:page-source: components/camel-resilience4j/src/main/docs/resilience4j.adoc -*Since Camel 2.18* +*Since Camel 3.0* -The Hystrix component integrates Netflix Hystrix circuit breaker in Camel routes. +This component supports the Circuit Breaker EIP with the Resilience4j library. -For more details see the Hystrix EIP documentation. +For more details see the Circuit Breaker EIP documentation. Maven users will need to add the following dependency to their `pom.xml` for this component: @@ -13,7 +14,7 @@ for this component: ---- <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix</artifactId> + <artifactId>camel-resilience4j</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> @@ -28,7 +29,7 @@ When using Spring Boot make sure to use the following Maven dependency to have s ---- <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix-starter</artifactId> + <artifactId>camel-resilience4j-starter</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> diff --git a/parent/pom.xml b/parent/pom.xml index 6bbc2a4..4c02d65 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -1963,6 +1963,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest</artifactId> <version>${project.version}</version> </dependency> @@ -3537,6 +3542,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-resilience4j-starter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-rest-starter</artifactId> <version>${project.version}</version> </dependency>
