This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push:
new 70a6470 CAMEL-14951: WireTap - If thread pool reject task then Camel
error handler should be able to react
70a6470 is described below
commit 70a6470d8eb9e3dcec6e382d77ac406250309e95
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 22 14:22:56 2020 +0200
CAMEL-14951: WireTap - If thread pool reject task then Camel error handler
should be able to react
---
.../apache/camel/processor/WireTapProcessor.java | 32 ++++---
.../camel/processor/WireTapAbortPolicyTest.java | 98 ++++++++++++++++++++++
2 files changed, 117 insertions(+), 13 deletions(-)
diff --git
a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 63f7ae4..c269f63 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -152,20 +152,26 @@ public class WireTapProcessor extends ServiceSupport
implements AsyncProcessor,
final Exchange wireTapExchange = target;
// send the exchange to the destination using an executor service
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- taskCount.increment();
- try {
- LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
- processor.process(wireTapExchange);
- } catch (Throwable e) {
- LOG.warn("Error occurred during processing " +
wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.",
e);
- } finally {
- taskCount.decrement();
+ try {
+ executorService.submit(new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ taskCount.increment();
+ try {
+ LOG.debug(">>>> (wiretap) {} {}", uri,
wireTapExchange);
+ processor.process(wireTapExchange);
+ } catch (Throwable e) {
+ LOG.warn("Error occurred during processing " +
wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.",
e);
+ } finally {
+ taskCount.decrement();
+ }
+ return wireTapExchange;
}
- return wireTapExchange;
- }
- });
+ });
+ } catch (Throwable e) {
+ // in case the thread pool rejects or cannot submit the task then
we need to catch
+ // so camel error handler can react
+ exchange.setException(e);
+ }
// continue routing this synchronously
callback.done(true);
diff --git
a/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
new file mode 100644
index 0000000..0e0f9fa
--- /dev/null
+++
b/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ThreadPoolRejectedPolicy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapAbortPolicyTest extends ContextTestSupport {
+ protected MockEndpoint tap;
+ protected MockEndpoint result;
+ protected ExecutorService pool;
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (pool != null) {
+ pool.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ // hello must come first, as we have delay on the tapped route
+ result.expectedMinimumMessageCount(2);
+ tap.expectedMinimumMessageCount(1);
+
+ template.sendBody("direct:start", "A");
+ template.sendBody("direct:start", "B");
+ try {
+ template.sendBody("direct:start", "C");
+ fail("Task should be rejected");
+ } catch (Exception e) {
+ assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ tap = getMockEndpoint("mock:tap");
+ result = getMockEndpoint("mock:result");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // use a custom thread pool for sending tapped messages
+ ExecutorService pool = new ThreadPoolBuilder(context)
+ // only allow 1 thread and 1 pending task
+ .poolSize(1)
+ .maxPoolSize(1)
+ .maxQueueSize(1)
+ // and about tasks
+ .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
+ .build("mypool");
+
+ from("direct:start").to("log:foo")
+ // pass in the custom pool to the wireTap DSL
+
.wireTap("direct:tap").executorService(pool).to("mock:result");
+ // END SNIPPET: e1
+
+ from("direct:tap").delay(1000).to("mock:tap");
+ }
+ };
+ }
+}
\ No newline at end of file