This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch CAMEL-13691 in repository https://gitbox.apache.org/repos/asf/camel.git
commit db108f86bc75aaf2420d979792b19fbc314417a8 Author: Claus Ibsen <[email protected]> AuthorDate: Sun Nov 17 11:46:21 2019 +0100 CAMEL-13691: camel-resilience4j - WIP --- .../resilience4j/ResilienceProcessor.java | 158 ++++++++++++++++++++- .../component/resilience4j/ResilienceReifier.java | 5 +- .../resilience4j/ResilienceManagementTest.java | 14 +- 3 files changed, 169 insertions(+), 8 deletions(-) diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java index 50acd37..b4ab2ae 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java @@ -37,6 +37,7 @@ import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdAware; import org.apache.camel.support.AsyncProcessorSupport; @@ -52,6 +53,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga private static final Logger LOG = LoggerFactory.getLogger(ResilienceProcessor.class); + private volatile CircuitBreaker circuitBreaker; private String id; private CircuitBreakerConfig circuitBreakerConfig; private BulkheadConfig bulkheadConfig; @@ -83,6 +85,124 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga return "resilience4j"; } + @ManagedAttribute(description = "Returns the current failure rate in percentage.") + public float getFailureRate() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getFailureRate(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current percentage of calls which were slower than a certain threshold.") + public float getSlowCallRate() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getSlowCallRate(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current total number of calls which were slower than a certain threshold.") + public int getNumberOfSlowCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfSlowCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current number of successful calls which were slower than a certain threshold.") + public int getNumberOfSlowSuccessfulCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfSlowCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current number of failed calls which were slower than a certain threshold.") + public int getNumberOfSlowFailedCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfSlowFailedCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current total number of buffered calls in the ring buffer.") + public int getNumberOfBufferedCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfBufferedCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current number of failed buffered calls in the ring buffer.") + public int getNumberOfFailedCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfFailedCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current number of successful buffered calls in the ring buffer") + public int getNumberOfSuccessfulCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfSuccessfulCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current number of not permitted calls, when the state is OPEN.") + public long getNumberOfNotPermittedCalls() { + if (circuitBreaker != null) { + return circuitBreaker.getMetrics().getNumberOfNotPermittedCalls(); + } else { + return 0; + } + } + + @ManagedAttribute(description = "Returns the current state of the circuit breaker") + public String getCircuitBreakerState() { + if (circuitBreaker != null) { + return circuitBreaker.getState().name(); + } else { + return null; + } + } + + @ManagedOperation(description = "Transitions the circuit breaker to CLOSED state.") + public void transitionToCloseState() { + if (circuitBreaker != null) { + circuitBreaker.transitionToClosedState(); + } + } + + @ManagedOperation(description = "Transitions the circuit breaker to OPEN state.") + public void transitionToOpenState() { + if (circuitBreaker != null) { + circuitBreaker.transitionToOpenState(); + } + } + + @ManagedOperation(description = "Transitions the circuit breaker to HALF_OPEN state.") + public void transitionToHalfOpenState() { + if (circuitBreaker != null) { + circuitBreaker.transitionToHalfOpenState(); + } + } + + @ManagedOperation(description = "Transitions the state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing.") + public void transitionToForceOpenState() { + if (circuitBreaker != null) { + circuitBreaker.transitionToForcedOpenState(); + } + } + @ManagedAttribute public float getCircuitBreakerFailureRateThreshold() { return circuitBreakerConfig.getFailureRateThreshold(); @@ -133,6 +253,38 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga return bulkheadConfig != null; } + @ManagedAttribute + public int getBulkheadMaxConcurrentCalls() { + if (bulkheadConfig != null) { + return bulkheadConfig.getMaxConcurrentCalls(); + } else { + return 0; + } + } + + @ManagedAttribute() + public long getBulkheadMaxWaitDuration() { + if (bulkheadConfig != null) { + return bulkheadConfig.getMaxWaitDuration().toMillis(); + } else { + return 0; + } + } + + @ManagedAttribute + public boolean isTimeoutEnabled() { + return timeLimiterConfig != null; + } + + @ManagedAttribute + public long getTimeoutDuration() { + if (timeLimiterConfig != null) { + return timeLimiterConfig.getTimeoutDuration().toMillis(); + } else { + return 0; + } + } + @Override public List<Processor> next() { if (!hasNext()) { @@ -156,9 +308,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga // run this as if we run inside try .. catch so there is no regular Camel error handler exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); - CircuitBreaker cb = CircuitBreaker.of(id, circuitBreakerConfig); - - Callable<Exchange> task = CircuitBreaker.decorateCallable(cb, new CircuitBreakerTask(processor, exchange)); + Callable<Exchange> task = CircuitBreaker.decorateCallable(circuitBreaker, new CircuitBreakerTask(processor, exchange)); Function<Throwable, Exchange> fallbackTask = new CircuitBreakerFallbackTask(fallback, exchange); if (bulkheadConfig != null) { Bulkhead bh = Bulkhead.of(id, bulkheadConfig); @@ -189,7 +339,7 @@ public class ResilienceProcessor extends AsyncProcessorSupport implements Naviga @Override protected void doStart() throws Exception { - // noop + circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig); } @Override diff --git a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java index 95da2a4..da157fd 100644 --- a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java +++ b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java @@ -43,9 +43,8 @@ import static org.apache.camel.support.CamelContextHelper.mandatoryLookup; public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition> { - // TODO: metrics with state of CB - // TODO: expose metrics as JMX on processor // TODO: thread pool bulkhead + // TODO: Configure timeout thread-pool globally // TODO: spring-boot allow to configure via resilience4j-spring-boot // TODO: example // TODO: camel-main - configure hystrix/resilience/rest via java code fluent builder (does it work) @@ -119,7 +118,7 @@ public class ResilienceReifier extends ProcessorReifier<CircuitBreakerDefinition builder.maxConcurrentCalls(config.getBulkheadMaxConcurrentCalls()); } if (config.getBulkheadMaxWaitDuration() != null) { - builder.maxWaitDuration(Duration.ofSeconds(config.getBulkheadMaxConcurrentCalls())); + builder.maxWaitDuration(Duration.ofMillis(config.getBulkheadMaxWaitDuration())); } return builder.build(); } diff --git a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java index 4b1046f..388c4d5 100644 --- a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java +++ b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java @@ -56,9 +56,21 @@ public class ResilienceManagementTest extends CamelTestSupport { String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); assertEquals("start", routeId); - // should be id of the node Integer num = (Integer) mbeanServer.getAttribute(on, "CircuitBreakerMinimumNumberOfCalls"); assertEquals("100", num.toString()); + + Integer totalRequests = (Integer) mbeanServer.getAttribute(on, "NumberOfSuccessfulCalls"); + assertEquals(1, totalRequests.intValue()); + + Integer errorCount = (Integer) mbeanServer.getAttribute(on, "NumberOfFailedCalls"); + assertEquals(0, errorCount.intValue()); + + String state = (String) mbeanServer.getAttribute(on, "CircuitBreakerState"); + assertEquals("CLOSED", state); + + mbeanServer.invoke(on, "transitionToOpenState", null, null); + state = (String) mbeanServer.getAttribute(on, "CircuitBreakerState"); + assertEquals("OPEN", state); } @Override
