Repository: camel Updated Branches: refs/heads/master 4a533134d -> b3ebc297f
CAMEL-7905 added failIfNoConsumers option to the direct-vm component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3ebc297 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3ebc297 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3ebc297 Branch: refs/heads/master Commit: b3ebc297fb94e1791ba72213d9ebea10c37477d3 Parents: 4a53313 Author: boday <[email protected]> Authored: Wed Apr 1 12:41:59 2015 -0700 Committer: boday <[email protected]> Committed: Wed Apr 1 12:46:53 2015 -0700 ---------------------------------------------------------------------- .../component/directvm/DirectVmEndpoint.java | 14 ++ .../component/directvm/DirectVmProducer.java | 12 +- .../directvm/DirectVmNoConsumerTest.java | 168 +++++++++++++++++++ 3 files changed, 192 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java index b616f67..8489c6c 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java @@ -39,6 +39,8 @@ public class DirectVmEndpoint extends DefaultEndpoint { private boolean block; @UriParam(label = "producer", defaultValue = "30000") private long timeout = 30000L; + @UriParam(label = "producer") + private boolean failIfNoConsumers = true; public DirectVmEndpoint(String endpointUri, DirectVmComponent component) { super(endpointUri, component); @@ -96,4 +98,16 @@ public class DirectVmEndpoint extends DefaultEndpoint { public void setTimeout(long timeout) { this.timeout = timeout; } + + public boolean isFailIfNoConsumers() { + return failIfNoConsumers; + } + + /** + * Whether the producer should fail by throwing an exception, when sending to a DIRECT-VM endpoint with no active consumers. + */ + public void setFailIfNoConsumers(boolean failIfNoConsumers) { + this.failIfNoConsumers = failIfNoConsumers; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java index 14a2138..32fb395 100644 --- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java @@ -37,7 +37,11 @@ public class DirectVmProducer extends DefaultAsyncProducer { // send to consumer DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint); if (consumer == null) { - throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + if (endpoint.isFailIfNoConsumers()) { + throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } else { + log.debug("message ignored, no consumers available on endpoint: " + endpoint); + } } else { consumer.getProcessor().process(exchange); } @@ -48,7 +52,11 @@ public class DirectVmProducer extends DefaultAsyncProducer { // send to consumer DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint); if (consumer == null) { - exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange)); + if (endpoint.isFailIfNoConsumers()) { + exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange)); + } else { + log.debug("message ignored, no consumers available on endpoint: " + endpoint); + } callback.done(true); return true; } else { http://git-wip-us.apache.org/repos/asf/camel/blob/b3ebc297/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java new file mode 100644 index 0000000..ae97899 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoConsumerTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.directvm; + +import java.util.concurrent.TimeUnit; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * @version + */ +public class DirectVmNoConsumerTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testInOnly() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:start").to("direct-vm:foo"); + } + }); + + context.start(); + + try { + template.sendBody("direct-vm:start", "Hello World"); + fail("Should throw an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + } + } + + public void testInOut() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:start").to("direct-vm:foo"); + } + }); + + context.start(); + + try { + template.requestBody("direct-vm:start", "Hello World"); + fail("Should throw an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + } + } + + @Test + public void testFailIfNoConsumerFalse() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:start").to("direct-vm:foo?failIfNoConsumers=false"); + } + }); + + context.start(); + + try { + template.sendBody("direct-vm:start", "Hello World"); + } catch (CamelExecutionException e) { + fail("Should not throw an exception"); + } + } + + @Test + public void testFailIfNoConsumersAfterConsumersLeave() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:foo").routeId("stopThisRoute").to("mock:foo"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + template.sendBody("direct-vm:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + + context.stopRoute("stopThisRoute"); + TimeUnit.MILLISECONDS.sleep(100); + try { + template.sendBody("direct-vm:foo", "Hello World"); + fail("Should throw an exception"); + } catch (CamelExecutionException e) { + assertIsInstanceOf(DirectVmConsumerNotAvailableException.class, e.getCause()); + } + } + + @Test + public void testFailIfNoConsumersWithValidConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:in").to("direct-vm:foo"); + from("direct-vm:foo").to("mock:foo"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + template.sendBody("direct-vm:in", "Hello World"); + + assertMockEndpointsSatisfied(); + + } + + @Test + public void testFailIfNoConsumersFalseWithPipeline() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:in").to("direct-vm:foo?failIfNoConsumers=false").to("direct-vm:bar"); + from("direct-vm:bar").to("mock:foo"); + } + }); + + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + template.sendBody("direct-vm:in", "Hello World"); + + assertMockEndpointsSatisfied(); + + } + + @Test + public void testConfigOnAConsumer() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct-vm:foo?failIfNoConsumers=false").to("log:test"); + } + }); + + context.start(); + } + +}
