Updated Branches: refs/heads/master 011002fd6 -> 0cad912e0
Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ad43a48a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ad43a48a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ad43a48a Branch: refs/heads/master Commit: ad43a48a65a42847dc76b6f3a8e78f9387ed6b72 Parents: de9de10 Author: Claus Ibsen <[email protected]> Authored: Wed Jun 12 16:00:29 2013 -0400 Committer: Claus Ibsen <[email protected]> Committed: Wed Jun 12 16:00:29 2013 -0400 ---------------------------------------------------------------------- .../camel/component/seda/SedaComponent.java | 22 +++++- .../camel/component/seda/SedaEndpoint.java | 13 +++- ...edaQueueMultipleConsumersDifferenceTest.java | 70 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java index 2578da9..77a8177 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java @@ -61,7 +61,15 @@ public class SedaComponent extends UriEndpointComponent { return defaultConcurrentConsumers; } + /** + * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)} + */ + @Deprecated public synchronized QueueReference getOrCreateQueue(String uri, Integer size) { + return getOrCreateQueue(uri, size, null); + } + + public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) { String key = getQueueKey(uri); QueueReference ref = getQueues().get(key); @@ -97,7 +105,7 @@ public class SedaComponent extends UriEndpointComponent { log.debug("Created queue {} with size {}", key, size); // create and add a new reference queue - ref = new QueueReference(queue, size); + ref = new QueueReference(queue, size, multipleConsumers); ref.addReference(); getQueues().put(key, ref); @@ -108,6 +116,10 @@ public class SedaComponent extends UriEndpointComponent { return queues; } + public QueueReference getQueueReference(String key) { + return queues.get(key); + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers); @@ -165,10 +177,12 @@ public class SedaComponent extends UriEndpointComponent { private final BlockingQueue<Exchange> queue; private volatile int count; private Integer size; + private Boolean multipleConsumers; - private QueueReference(BlockingQueue<Exchange> queue, Integer size) { + private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) { this.queue = queue; this.size = size; + this.multipleConsumers = multipleConsumers; } void addReference() { @@ -195,6 +209,10 @@ public class SedaComponent extends UriEndpointComponent { return size; } + public Boolean getMultipleConsumers() { + return multipleConsumers; + } + /** * Gets the queue */ http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 56a5d4e..656736c 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -106,6 +106,17 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } public Consumer createConsumer(Processor processor) throws Exception { + if (getComponent() != null) { + // all consumers must match having the same multipleConsumers options + String key = getComponent().getQueueKey(getEndpointUri()); + SedaComponent.QueueReference ref = getComponent().getQueueReference(key); + if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { + // there is already a multiple consumers, so make sure they matches + throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers " + + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers); + } + } + Consumer answer = new SedaConsumer(this, processor); configureConsumer(answer); return answer; @@ -119,7 +130,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (getComponent() != null) { // use null to indicate default size (= use what the existing queue has been configured with) Integer size = getSize() == Integer.MAX_VALUE ? null : getSize(); - SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size); + SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers()); queue = ref.getQueue(); String key = getComponent().getQueueKey(getEndpointUri()); LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE}); http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java new file mode 100644 index 0000000..98856df --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.seda; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ResolveEndpointFailedException; +import org.apache.camel.builder.RouteBuilder; + +/** + * + */ +public class SameSedaQueueMultipleConsumersDifferenceTest extends ContextTestSupport { + + public void testSameOptions() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo?multipleConsumers=true", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testSameOptionsProducerStillOkay() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("seda:foo", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testAddConsumer() throws Exception { + try { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo").routeId("fail").to("mock:fail"); + } + }); + fail("Should have thrown exception"); + } catch (IllegalArgumentException e) { + assertEquals("Cannot use existing queue seda://foo as the existing queue multiple consumers true does not match given multiple consumers false", e.getMessage()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:foo?multipleConsumers=true").routeId("foo").to("mock:foo"); + from("seda:foo?multipleConsumers=true").routeId("bar").to("mock:bar"); + } + }; + } +}
