Repository: camel Updated Branches: refs/heads/master 821bddf58 -> 7747c4aec
CAMEL-5911: seda/vm add discardIfNoConsumers option Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7bfdef25 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7bfdef25 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7bfdef25 Branch: refs/heads/master Commit: 7bfdef2538ccecb46c1769c31afe88dd17249b2b Parents: 821bddf Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jul 10 15:45:33 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jul 10 16:21:54 2015 +0200 ---------------------------------------------------------------------- .../camel/component/seda/SedaComponent.java | 3 ++ .../camel/component/seda/SedaEndpoint.java | 35 +++++++++++++- .../camel/component/seda/SedaProducer.java | 11 ++++- .../seda/SedaDiscardIfNoConsumerTest.java | 49 ++++++++++++++++++++ .../component/vm/VmDiscardIfNoConsumerTest.java | 49 ++++++++++++++++++++ 5 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index c8084ce..87a0843 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -171,6 +171,7 @@ public class SedaComponent extends UriEndpointComponent { throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than " + maxConcurrentConsumers + " was " + consumers); } + // Resolve queue reference BlockingQueue<Exchange> queue = resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class); SedaEndpoint answer; @@ -183,6 +184,8 @@ public class SedaComponent extends UriEndpointComponent { answer = createEndpoint(uri, this, queue, consumers); } answer.configureProperties(parameters); + answer.setConcurrentConsumers(consumers); + answer.setLimitConcurrentConsumers(limitConcurrentConsumers); return answer; } http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index e8090a4..1b82935 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -73,6 +73,8 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, @UriParam(label = "consumer", defaultValue = "1") private int concurrentConsumers = 1; + @UriParam(label = "consumer", defaultValue = "true") + private boolean limitConcurrentConsumers = true; @UriParam(label = "consumer") private boolean multipleConsumers; @UriParam(label = "consumer") @@ -88,6 +90,8 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, private boolean blockWhenFull; @UriParam(label = "producer") private boolean failIfNoConsumers; + @UriParam(label = "producer") + private boolean discardIfNoConsumers; private BlockingQueueFactory<Exchange> queueFactory; @@ -292,6 +296,19 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, return concurrentConsumers; } + @ManagedAttribute + public boolean isLimitConcurrentConsumers() { + return limitConcurrentConsumers; + } + + /** + * Whether to limit the number of concurrentConsumers to the maximum of 500. + * By default, an exception will be thrown if an endpoint is configured with a greater number. You can disable that check by turning this option off. + */ + public void setLimitConcurrentConsumers(boolean limitConcurrentConsumers) { + this.limitConcurrentConsumers = limitConcurrentConsumers; + } + public WaitForTaskToComplete getWaitForTaskToComplete() { return waitForTaskToComplete; } @@ -326,13 +343,29 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } /** - * Whether the producer should fail by throwing an exception, when sending to a SEDA queue with no active consumers. + * Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. + * <p/> + * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same time. */ public void setFailIfNoConsumers(boolean failIfNoConsumers) { this.failIfNoConsumers = failIfNoConsumers; } @ManagedAttribute + public boolean isDiscardIfNoConsumers() { + return discardIfNoConsumers; + } + + /** + * Whether the producer should discard the message (do not add the message to the queue), when sending to a queue with no active consumers. + * <p/> + * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same time. + */ + public void setDiscardIfNoConsumers(boolean discardIfNoConsumers) { + this.discardIfNoConsumers = discardIfNoConsumers; + } + + @ManagedAttribute public boolean isMultipleConsumers() { return multipleConsumers; } http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java index 56b83f7..a87ddf3 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java @@ -216,9 +216,16 @@ public class SedaProducer extends DefaultAsyncProducer { throw new SedaConsumerNotAvailableException("No queue available on endpoint: " + endpoint, exchange); } - if (endpoint.isFailIfNoConsumers() && !queueReference.hasConsumers()) { - throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + boolean empty = !queueReference.hasConsumers(); + if (empty) { + if (endpoint.isFailIfNoConsumers()) { + throw new SedaConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange); + } else if (endpoint.isDiscardIfNoConsumers()) { + log.debug("Discard message as no active consumers on endpoint: " + endpoint); + return; + } } + if (blockWhenFull) { try { queue.put(exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java new file mode 100644 index 0000000..630abd4 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class SedaDiscardIfNoConsumerTest extends ContextTestSupport { + + public void testDiscard() throws Exception { + SedaEndpoint bar = getMandatoryEndpoint("seda:bar", SedaEndpoint.class); + assertEquals(0, bar.getCurrentQueueSize()); + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(0, bar.getCurrentQueueSize()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("seda:bar?discardIfNoConsumers=true").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java b/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java new file mode 100644 index 0000000..6581d73 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vm; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class VmDiscardIfNoConsumerTest extends ContextTestSupport { + + public void testDiscard() throws Exception { + VmEndpoint bar = getMandatoryEndpoint("vm:bar", VmEndpoint.class); + assertEquals(0, bar.getCurrentQueueSize()); + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(0, bar.getCurrentQueueSize()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("vm:bar?discardIfNoConsumers=true").to("mock:result"); + } + }; + } +}