This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch CAMEL-13691
in repository https://gitbox.apache.org/repos/asf/camel.git

commit db108f86bc75aaf2420d979792b19fbc314417a8
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Nov 17 11:46:21 2019 +0100

    CAMEL-13691: camel-resilience4j - WIP
---
 .../resilience4j/ResilienceProcessor.java          | 158 ++++++++++++++++++++-
 .../component/resilience4j/ResilienceReifier.java  |   5 +-
 .../resilience4j/ResilienceManagementTest.java     |  14 +-
 3 files changed, 169 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 50acd37..b4ab2ae 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -37,6 +37,7 @@ import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -52,6 +53,7 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport implements Naviga
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ResilienceProcessor.class);
 
+    private volatile CircuitBreaker circuitBreaker;
     private String id;
     private CircuitBreakerConfig circuitBreakerConfig;
     private BulkheadConfig bulkheadConfig;
@@ -83,6 +85,124 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport implements Naviga
         return "resilience4j";
     }
 
+    @ManagedAttribute(description = "Returns the current failure rate in 
percentage.")
+    public float getFailureRate() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getFailureRate();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current percentage of calls 
which were slower than a certain threshold.")
+    public float getSlowCallRate() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getSlowCallRate();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current total number of calls 
which were slower than a certain threshold.")
+    public int getNumberOfSlowCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfSlowCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current number of successful 
calls which were slower than a certain threshold.")
+    public int getNumberOfSlowSuccessfulCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfSlowCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current number of failed 
calls which were slower than a certain threshold.")
+    public int getNumberOfSlowFailedCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfSlowFailedCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current total number of 
buffered calls in the ring buffer.")
+    public int getNumberOfBufferedCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfBufferedCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current number of failed 
buffered calls in the ring buffer.")
+    public int getNumberOfFailedCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfFailedCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current number of successful 
buffered calls in the ring buffer")
+    public int getNumberOfSuccessfulCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfSuccessfulCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current number of not 
permitted calls, when the state is OPEN.")
+    public long getNumberOfNotPermittedCalls() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getMetrics().getNumberOfNotPermittedCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute(description = "Returns the current state of the circuit 
breaker")
+    public String getCircuitBreakerState() {
+        if (circuitBreaker != null) {
+            return circuitBreaker.getState().name();
+        } else {
+            return null;
+        }
+    }
+
+    @ManagedOperation(description = "Transitions the circuit breaker to CLOSED 
state.")
+    public void transitionToCloseState() {
+        if (circuitBreaker != null) {
+            circuitBreaker.transitionToClosedState();
+        }
+    }
+
+    @ManagedOperation(description = "Transitions the circuit breaker to OPEN 
state.")
+    public void transitionToOpenState() {
+        if (circuitBreaker != null) {
+            circuitBreaker.transitionToOpenState();
+        }
+    }
+
+    @ManagedOperation(description = "Transitions the circuit breaker to 
HALF_OPEN state.")
+    public void transitionToHalfOpenState() {
+        if (circuitBreaker != null) {
+            circuitBreaker.transitionToHalfOpenState();
+        }
+    }
+
+    @ManagedOperation(description = "Transitions the state machine to a 
FORCED_OPEN state, stopping state transition, metrics and event publishing.")
+    public void transitionToForceOpenState() {
+        if (circuitBreaker != null) {
+            circuitBreaker.transitionToForcedOpenState();
+        }
+    }
+
     @ManagedAttribute
     public float getCircuitBreakerFailureRateThreshold() {
         return circuitBreakerConfig.getFailureRateThreshold();
@@ -133,6 +253,38 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport implements Naviga
         return bulkheadConfig != null;
     }
 
+    @ManagedAttribute
+    public int getBulkheadMaxConcurrentCalls() {
+        if (bulkheadConfig != null) {
+            return bulkheadConfig.getMaxConcurrentCalls();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute()
+    public long getBulkheadMaxWaitDuration() {
+        if (bulkheadConfig != null) {
+            return bulkheadConfig.getMaxWaitDuration().toMillis();
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedAttribute
+    public boolean isTimeoutEnabled() {
+        return timeLimiterConfig != null;
+    }
+
+    @ManagedAttribute
+    public long getTimeoutDuration() {
+        if (timeLimiterConfig != null) {
+            return timeLimiterConfig.getTimeoutDuration().toMillis();
+        } else {
+            return 0;
+        }
+    }
+
     @Override
     public List<Processor> next() {
         if (!hasNext()) {
@@ -156,9 +308,7 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport implements Naviga
         // run this as if we run inside try .. catch so there is no regular 
Camel error handler
         exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
 
-        CircuitBreaker cb = CircuitBreaker.of(id, circuitBreakerConfig);
-
-        Callable<Exchange> task = CircuitBreaker.decorateCallable(cb, new 
CircuitBreakerTask(processor, exchange));
+        Callable<Exchange> task = 
CircuitBreaker.decorateCallable(circuitBreaker, new 
CircuitBreakerTask(processor, exchange));
         Function<Throwable, Exchange> fallbackTask = new 
CircuitBreakerFallbackTask(fallback, exchange);
         if (bulkheadConfig != null) {
             Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
@@ -189,7 +339,7 @@ public class ResilienceProcessor extends 
AsyncProcessorSupport implements Naviga
 
     @Override
     protected void doStart() throws Exception {
-        // noop
+        circuitBreaker = CircuitBreaker.of(id, circuitBreakerConfig);
     }
 
     @Override
diff --git 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 95da2a4..da157fd 100644
--- 
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++ 
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -43,9 +43,8 @@ import static 
org.apache.camel.support.CamelContextHelper.mandatoryLookup;
 
 public class ResilienceReifier extends 
ProcessorReifier<CircuitBreakerDefinition> {
 
-    // TODO: metrics with state of CB
-    // TODO: expose metrics as JMX on processor
     // TODO: thread pool bulkhead
+    // TODO: Configure timeout thread-pool globally
     // TODO: spring-boot allow to configure via resilience4j-spring-boot
     // TODO: example
     // TODO: camel-main - configure hystrix/resilience/rest via java code 
fluent builder (does it work)
@@ -119,7 +118,7 @@ public class ResilienceReifier extends 
ProcessorReifier<CircuitBreakerDefinition
             builder.maxConcurrentCalls(config.getBulkheadMaxConcurrentCalls());
         }
         if (config.getBulkheadMaxWaitDuration() != null) {
-            
builder.maxWaitDuration(Duration.ofSeconds(config.getBulkheadMaxConcurrentCalls()));
+            
builder.maxWaitDuration(Duration.ofMillis(config.getBulkheadMaxWaitDuration()));
         }
         return builder.build();
     }
diff --git 
a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java
 
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java
index 4b1046f..388c4d5 100644
--- 
a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java
+++ 
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceManagementTest.java
@@ -56,9 +56,21 @@ public class ResilienceManagementTest extends 
CamelTestSupport {
         String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
         assertEquals("start", routeId);
 
-        // should be id of the node
         Integer num = (Integer) mbeanServer.getAttribute(on, 
"CircuitBreakerMinimumNumberOfCalls");
         assertEquals("100", num.toString());
+
+        Integer totalRequests = (Integer) mbeanServer.getAttribute(on, 
"NumberOfSuccessfulCalls");
+        assertEquals(1, totalRequests.intValue());
+
+        Integer errorCount = (Integer) mbeanServer.getAttribute(on, 
"NumberOfFailedCalls");
+        assertEquals(0, errorCount.intValue());
+
+        String state = (String) mbeanServer.getAttribute(on, 
"CircuitBreakerState");
+        assertEquals("CLOSED", state);
+
+        mbeanServer.invoke(on, "transitionToOpenState", null, null);
+        state = (String) mbeanServer.getAttribute(on, "CircuitBreakerState");
+        assertEquals("OPEN", state);
     }
 
     @Override

Reply via email to