This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 41f580f CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit
concurrent requests. Thanks to Jesper Duelund Isaksen for the excellent
reproducer example.
41f580f is described below
commit 41f580f720b042419d877c2e4aac07fecfebf7d9
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Mar 18 20:32:29 2021 +0100
CAMEL-16173: Camel Resilience4j Bulkhead seems to not limit concurrent
requests. Thanks to Jesper Duelund Isaksen for the excellent reproducer example.
---
.../resilience4j/ResilienceProcessor.java | 78 +++++++++++++++++-----
.../component/resilience4j/ResilienceReifier.java | 7 +-
.../model/Resilience4jConfigurationDefinition.java | 2 +-
3 files changed, 68 insertions(+), 19 deletions(-)
diff --git
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 537d59e..26e6bfe 100644
---
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
@@ -67,7 +68,9 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private String id;
private final CircuitBreakerConfig circuitBreakerConfig;
private final BulkheadConfig bulkheadConfig;
+ private Bulkhead bulkhead;
private final TimeLimiterConfig timeLimiterConfig;
+ private TimeLimiter timeLimiter;
private final Processor processor;
private final Processor fallback;
private boolean shutdownExecutorService;
@@ -84,6 +87,17 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
}
@Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+ if (timeLimiterConfig != null) {
+ timeLimiter = TimeLimiter.of(id, timeLimiterConfig);
+ }
+ if (bulkheadConfig != null) {
+ bulkhead = Bulkhead.of(id, bulkheadConfig);
+ }
+ }
+
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}
@@ -358,31 +372,39 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
Callable<Exchange> task;
- if (timeLimiterConfig != null) {
- // timeout handling is more complex with thread-pools
-
- TimeLimiter tl = TimeLimiter.of(id, timeLimiterConfig);
+ if (timeLimiter != null) {
Supplier<CompletableFuture<Exchange>> futureSupplier;
if (executorService == null) {
futureSupplier = () -> CompletableFuture.supplyAsync(() ->
processInCopy(exchange));
} else {
futureSupplier = () -> CompletableFuture.supplyAsync(() ->
processInCopy(exchange), executorService);
}
- task = TimeLimiter.decorateFutureSupplier(tl, futureSupplier);
+ task = TimeLimiter.decorateFutureSupplier(timeLimiter,
futureSupplier);
} else {
task = new CircuitBreakerTask(() -> processInCopy(exchange));
}
- if (bulkheadConfig != null) {
- Bulkhead bh = Bulkhead.of(id, bulkheadConfig);
- task = Bulkhead.decorateCallable(bh, task);
+ if (bulkhead != null) {
+ task = Bulkhead.decorateCallable(bulkhead, task);
}
task = CircuitBreaker.decorateCallable(circuitBreaker, task);
-
- Function<Throwable, Exchange> fallbackTask = new
CircuitBreakerFallbackTask(this.fallback, exchange);
- Try.ofCallable(task).recover(fallbackTask).andFinally(() ->
callback.done(false)).get();
- return false;
+ Function<Throwable, Exchange> fallbackTask = new
CircuitBreakerFallbackTask(this.id, this.fallback, exchange);
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} using circuit breaker: {}",
exchange.getExchangeId(), id);
+ }
+ Try.ofCallable(task).recover(fallbackTask).get();
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ if (LOG.isTraceEnabled()) {
+ boolean failed = exchange.isFailed();
+ LOG.trace("Processing exchange: {} using circuit breaker: {}
complete (failed: {})", exchange.getExchangeId(), id,
+ failed);
+ }
+ callback.done(true);
+ return true;
}
private Exchange processInCopy(Exchange exchange) {
@@ -443,16 +465,23 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
private static final class CircuitBreakerFallbackTask implements
Function<Throwable, Exchange> {
+ private final String id;
private final Processor processor;
private final Exchange exchange;
- private CircuitBreakerFallbackTask(Processor processor, Exchange
exchange) {
+ private CircuitBreakerFallbackTask(String id, Processor processor,
Exchange exchange) {
+ this.id = id;
this.processor = processor;
this.exchange = exchange;
}
@Override
public Exchange apply(Throwable throwable) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} recover task using circuit
breaker: {} from: {}", exchange.getExchangeId(),
+ id, throwable);
+ }
+
if (processor == null) {
if (throwable instanceof TimeoutException) {
// the circuit breaker triggered a timeout (and there is no
@@ -465,14 +494,29 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
return exchange;
} else if (throwable instanceof CallNotPermittedException) {
// the circuit breaker triggered a call rejected
+ // where the circuit breaker is half-open / open and
therefore
+ // we should just set properties and do not set any
exception
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION,
false);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+ return exchange;
+ } else if (throwable instanceof BulkheadFullException) {
+ // the circuit breaker bulkhead is full
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION,
false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
- throw
RuntimeExchangeException.wrapRuntimeException(throwable);
+ exchange.setException(throwable);
+ return exchange;
} else {
- // throw exception so resilient4j know it was a failure
- throw
RuntimeExchangeException.wrapRuntimeException(throwable);
+ // other kind of exception
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SUCCESSFUL_EXECUTION,
false);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_FROM_FALLBACK, false);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_SHORT_CIRCUITED, true);
+
exchange.setProperty(CircuitBreakerConstants.RESPONSE_REJECTED, true);
+ exchange.setException(throwable);
+ return exchange;
}
}
@@ -500,7 +544,7 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
// process the fallback until its fully done
processor.process(exchange);
LOG.debug("Running fallback: {} with exchange: {} done",
processor, exchange);
- } catch (Exception e) {
+ } catch (Throwable e) {
exchange.setException(e);
}
diff --git
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index 76c9597..c65eb29 100644
---
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -120,7 +120,12 @@ public class ResilienceReifier extends
ProcessorReifier<CircuitBreakerDefinition
builder.maxConcurrentCalls(parseInt(config.getBulkheadMaxConcurrentCalls()));
}
if (config.getBulkheadMaxWaitDuration() != null) {
-
builder.maxWaitDuration(Duration.ofMillis(parseLong(config.getBulkheadMaxWaitDuration())));
+ long duration = parseLong(config.getBulkheadMaxWaitDuration());
+ if (duration <= 0) {
+ builder.maxWaitDuration(Duration.ZERO);
+ } else {
+ builder.maxWaitDuration(Duration.ofMillis(duration));
+ }
}
return builder.build();
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 9fb6891..77468c2 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -198,7 +198,7 @@ public class Resilience4jConfigurationDefinition extends
Resilience4jConfigurati
* Configures the max amount of concurrent calls the bulkhead will support.
*/
public Resilience4jConfigurationDefinition bulkheadMaxConcurrentCalls(int
bulkheadMaxConcurrentCalls) {
-
setBulkheadMaxWaitDuration(Integer.toString(bulkheadMaxConcurrentCalls));
+
setBulkheadMaxConcurrentCalls(Integer.toString(bulkheadMaxConcurrentCalls));
return this;
}