This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new cbc03ef4d20 CAMEL-22195: camel-resilience4j - Fix using record and
ignore exceptions (which can be wrapped) and only trigger fallback accordingly.
(#18455)
cbc03ef4d20 is described below
commit cbc03ef4d2044f28ddea2a42330e16bce90b6cf9
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jun 24 20:56:11 2025 +0200
CAMEL-22195: camel-resilience4j - Fix using record and ignore exceptions
(which can be wrapped) and only trigger fallback accordingly. (#18455)
---
.../resilience4j/ResilienceProcessor.java | 35 ++++++++-
.../component/resilience4j/ResilienceReifier.java | 35 +++++++--
.../ResilienceIgnoreExceptionTest.java | 84 ++++++++++++++++++++++
.../ResilienceRecordExceptionTest.java | 84 ++++++++++++++++++++++
.../ResilienceRecordIgnoreExceptionTest.java | 84 ++++++++++++++++++++++
.../model/Resilience4jConfigurationDefinition.java | 25 +++++++
6 files changed, 339 insertions(+), 8 deletions(-)
diff --git
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
index 4e8762ef047..c3b0c4d1e5a 100644
---
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
+++
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceProcessor.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import io.github.resilience4j.bulkhead.Bulkhead;
@@ -85,6 +86,8 @@ public class ResilienceProcessor extends AsyncProcessorSupport
private final Processor processor;
private final Processor fallback;
private final boolean throwExceptionWhenHalfOpenOrOpenState;
+ private final Predicate<Throwable> recordPredicate;
+ private final Predicate<Throwable> ignorePredicate;
private boolean shutdownExecutorService;
private ExecutorService executorService;
private ProcessorExchangeFactory processorExchangeFactory;
@@ -93,13 +96,16 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
public ResilienceProcessor(CircuitBreakerConfig circuitBreakerConfig,
BulkheadConfig bulkheadConfig,
TimeLimiterConfig timeLimiterConfig, Processor
processor,
- Processor fallback, boolean
throwExceptionWhenHalfOpenOrOpenState) {
+ Processor fallback, boolean
throwExceptionWhenHalfOpenOrOpenState,
+ Predicate<Throwable> recordPredicate,
Predicate<Throwable> ignorePredicate) {
this.circuitBreakerConfig = circuitBreakerConfig;
this.bulkheadConfig = bulkheadConfig;
this.timeLimiterConfig = timeLimiterConfig;
this.processor = processor;
this.fallback = fallback;
this.throwExceptionWhenHalfOpenOrOpenState =
throwExceptionWhenHalfOpenOrOpenState;
+ this.recordPredicate = recordPredicate;
+ this.ignorePredicate = ignorePredicate;
}
@Override
@@ -621,8 +627,31 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
@Override
public Exchange apply(Throwable throwable) {
+ // check again if we should ignore or not record the throw
exception as a failure
+ if (ignorePredicate != null && ignorePredicate.test(throwable)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} recover task using
circuit breaker: {} ignored exception: {}",
+ exchange.getExchangeId(),
+ id, throwable);
+ }
+ // exception should be ignored
+ exchange.setException(null);
+ return exchange;
+ }
+ if (recordPredicate != null && !recordPredicate.test(throwable)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange: {} recover task using
circuit breaker: {} success exception: {}",
+ exchange.getExchangeId(),
+ id, throwable);
+ }
+ // exception is a success
+ exchange.setException(null);
+ return exchange;
+ }
+
if (LOG.isTraceEnabled()) {
- LOG.trace("Processing exchange: {} recover task using circuit
breaker: {} from: {}", exchange.getExchangeId(),
+ LOG.trace("Processing exchange: {} recover task using circuit
breaker: {} failed exception: {}",
+ exchange.getExchangeId(),
id, throwable);
}
@@ -688,7 +717,7 @@ public class ResilienceProcessor extends
AsyncProcessorSupport
LOG.debug("Running fallback: {} with exchange: {}", fallback,
exchange);
// process the fallback until its fully done
fallback.process(exchange);
- LOG.debug("Running fallback: {} with exchange: {} done",
fallback, exchange);
+ LOG.trace("Running fallback: {} with exchange: {} done",
fallback, exchange);
} catch (Throwable e) {
exchange.setException(e);
}
diff --git
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
index a93909b6d21..9607d9a43e4 100644
---
a/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
+++
b/components/camel-resilience4j/src/main/java/org/apache/camel/component/resilience4j/ResilienceReifier.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import java.util.function.Predicate;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
@@ -41,6 +42,7 @@ import org.apache.camel.spi.PropertyConfigurer;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.function.Suppliers;
public class ResilienceReifier extends
ProcessorReifier<CircuitBreakerDefinition> {
@@ -71,9 +73,18 @@ public class ResilienceReifier extends
ProcessorReifier<CircuitBreakerDefinition
if (b != null) {
throwExceptionWhenHalfOpenOrOpenState = b;
}
+ Predicate<Throwable> recordPredicate = null;
+ if (!config.getRecordExceptions().isEmpty()) {
+ recordPredicate = cbConfig.getRecordExceptionPredicate();
+ }
+ Predicate<Throwable> ignorePredicate = null;
+ if (!config.getIgnoreExceptions().isEmpty()) {
+ ignorePredicate = cbConfig.getIgnoreExceptionPredicate();
+ }
ResilienceProcessor answer = new ResilienceProcessor(
- cbConfig, bhConfig, tlConfig, processor, fallback,
throwExceptionWhenHalfOpenOrOpenState);
+ cbConfig, bhConfig, tlConfig, processor, fallback,
throwExceptionWhenHalfOpenOrOpenState, recordPredicate,
+ ignorePredicate);
configureTimeoutExecutorService(answer, config);
// using any existing circuit breakers?
if (config.getCircuitBreaker() != null) {
@@ -116,11 +127,11 @@ public class ResilienceReifier extends
ProcessorReifier<CircuitBreakerDefinition
if (config.getWritableStackTraceEnabled() != null) {
builder.writableStackTraceEnabled(parseBoolean(config.getWritableStackTraceEnabled()));
}
- if (config.getRecordExceptions() != null) {
- builder.recordExceptions(createRecordExceptionClasses());
+ if (!config.getRecordExceptions().isEmpty()) {
+
builder.recordException(createExceptionPredicate(createRecordExceptionClasses()));
}
- if (config.getIgnoreExceptions() != null) {
- builder.ignoreExceptions(createIgnoreExceptionClasses());
+ if (!config.getIgnoreExceptions().isEmpty()) {
+
builder.ignoreException(createExceptionPredicate(createIgnoreExceptionClasses()));
}
return builder.build();
}
@@ -261,4 +272,18 @@ public class ResilienceReifier extends
ProcessorReifier<CircuitBreakerDefinition
}
return answer.toArray(new Class[0]);
}
+
+ private Predicate<Throwable> createExceptionPredicate(final Class<?
extends Throwable>[] exceptions) {
+ return t -> {
+ for (Throwable te : ObjectHelper.createExceptionIterable(t)) {
+ for (var ex : exceptions) {
+ if (ex.isInstance(te)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ };
+ }
+
}
diff --git
a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java
new file mode 100644
index 00000000000..016e23c4b22
--- /dev/null
+++
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceIgnoreExceptionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class ResilienceIgnoreExceptionTest extends CamelTestSupport {
+
+ @Test
+ public void testHello() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+ template.sendBody("direct:start", "Hello World");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testFile() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("file");
+ template.sendBody("direct:start", "file");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testKaboom() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Fallback
message");
+ template.sendBody("direct:start", "kaboom");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testIo() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("io");
+ template.sendBody("direct:start", "io");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("log:start")
+
.circuitBreaker().resilience4jConfiguration().ignoreException(IOException.class).end()
+ .process(e -> {
+ String b = e.getMessage().getBody(String.class);
+ if ("kaboom".equals(b)) {
+ throw new NullPointerException();
+ } else if ("file".equals(b)) {
+ throw new FileNotFoundException("unknown.txt");
+ } else if ("io".equals(b)) {
+ throw new IOException("Host not found");
+ }
+ })
+ .onFallback()
+ .transform().constant("Fallback message")
+ .end()
+ .to("log:result")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
diff --git
a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java
new file mode 100644
index 00000000000..72d20cd8058
--- /dev/null
+++
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordExceptionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class ResilienceRecordExceptionTest extends CamelTestSupport {
+
+ @Test
+ public void testHello() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+ template.sendBody("direct:start", "Hello World");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testFile() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Fallback
message");
+ template.sendBody("direct:start", "file");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testKaboom() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("kaboom");
+ template.sendBody("direct:start", "kaboom");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testIo() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Fallback
message");
+ template.sendBody("direct:start", "io");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("log:start")
+
.circuitBreaker().resilience4jConfiguration().recordException(IOException.class).end()
+ .process(e -> {
+ String b = e.getMessage().getBody(String.class);
+ if ("kaboom".equals(b)) {
+ throw new NullPointerException();
+ } else if ("file".equals(b)) {
+ throw new FileNotFoundException("unknown.txt");
+ } else if ("io".equals(b)) {
+ throw new IOException("Host not found");
+ }
+ })
+ .onFallback()
+ .transform().constant("Fallback message")
+ .end()
+ .to("log:result")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
diff --git
a/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java
new file mode 100644
index 00000000000..1517d027f4e
--- /dev/null
+++
b/components/camel-resilience4j/src/test/java/org/apache/camel/component/resilience4j/ResilienceRecordIgnoreExceptionTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.resilience4j;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class ResilienceRecordIgnoreExceptionTest extends CamelTestSupport {
+
+ @Test
+ public void testHello() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+ template.sendBody("direct:start", "Hello World");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testFile() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("file");
+ template.sendBody("direct:start", "file");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testIo() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Fallback
message");
+ template.sendBody("direct:start", "io");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Test
+ public void testKaboom() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("kaboom");
+ template.sendBody("direct:start", "kaboom");
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("log:start")
+
.circuitBreaker().resilience4jConfiguration().recordException(IOException.class).ignoreException(FileNotFoundException.class).end()
+ .process(e -> {
+ String b = e.getMessage().getBody(String.class);
+ if ("kaboom".equals(b)) {
+ throw new NullPointerException();
+ } else if ("file".equals(b)) {
+ throw new FileNotFoundException("unknown.txt");
+ } else if ("io".equals(b)) {
+ throw new IOException("Host not found");
+ }
+ })
+ .onFallback()
+ .transform().constant("Fallback message")
+ .end()
+ .to("log:result")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
index 1d632ab77c7..71481510fbc 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/Resilience4jConfigurationDefinition.java
@@ -271,6 +271,18 @@ public class Resilience4jConfigurationDefinition extends
Resilience4jConfigurati
return this;
}
+ /**
+ * Configure a list of exceptions that are recorded as a failure and thus
increase the failure rate. Any exception
+ * matching or inheriting from one of the list counts as a failure, unless
explicitly ignored via ignoreExceptions.
+ */
+ @SafeVarargs
+ public final Resilience4jConfigurationDefinition recordException(Class<?
extends Throwable>... exception) {
+ for (Class<? extends Throwable> t : exception) {
+ getRecordExceptions().add(t.getName());
+ }
+ return this;
+ }
+
/**
* Configure a list of exceptions that are ignored and neither count as a
failure nor success. Any exception
* matching or inheriting from one of the list will not count as a failure
nor success, even if the exceptions is
@@ -283,6 +295,19 @@ public class Resilience4jConfigurationDefinition extends
Resilience4jConfigurati
return this;
}
+ /**
+ * Configure a list of exceptions that are ignored and neither count as a
failure nor success. Any exception
+ * matching or inheriting from one of the list will not count as a failure
nor success, even if the exceptions is
+ * part of recordExceptions.
+ */
+ @SafeVarargs
+ public final Resilience4jConfigurationDefinition ignoreException(Class<?
extends Throwable>... exception) {
+ for (Class<? extends Throwable> t : exception) {
+ getIgnoreExceptions().add(t.getName());
+ }
+ return this;
+ }
+
/**
* End of configuration.
*/