Repository: camel
Updated Branches:
  refs/heads/master 821bddf58 -> 7747c4aec


CAMEL-5911: seda/vm add discardIfNoConsumers option


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7bfdef25
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7bfdef25
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7bfdef25

Branch: refs/heads/master
Commit: 7bfdef2538ccecb46c1769c31afe88dd17249b2b
Parents: 821bddf
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jul 10 15:45:33 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jul 10 16:21:54 2015 +0200

----------------------------------------------------------------------
 .../camel/component/seda/SedaComponent.java     |  3 ++
 .../camel/component/seda/SedaEndpoint.java      | 35 +++++++++++++-
 .../camel/component/seda/SedaProducer.java      | 11 ++++-
 .../seda/SedaDiscardIfNoConsumerTest.java       | 49 ++++++++++++++++++++
 .../component/vm/VmDiscardIfNoConsumerTest.java | 49 ++++++++++++++++++++
 5 files changed, 144 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index c8084ce..87a0843 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -171,6 +171,7 @@ public class SedaComponent extends UriEndpointComponent {
             throw new IllegalArgumentException("The limitConcurrentConsumers 
flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
+
         // Resolve queue reference
         BlockingQueue<Exchange> queue = 
resolveAndRemoveReferenceParameter(parameters, "queue", BlockingQueue.class);
         SedaEndpoint answer;
@@ -183,6 +184,8 @@ public class SedaComponent extends UriEndpointComponent {
             answer = createEndpoint(uri, this, queue, consumers);
         }
         answer.configureProperties(parameters);
+        answer.setConcurrentConsumers(consumers);
+        answer.setLimitConcurrentConsumers(limitConcurrentConsumers);
         return answer;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index e8090a4..1b82935 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -73,6 +73,8 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
 
     @UriParam(label = "consumer", defaultValue = "1")
     private int concurrentConsumers = 1;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean limitConcurrentConsumers = true;
     @UriParam(label = "consumer")
     private boolean multipleConsumers;
     @UriParam(label = "consumer")
@@ -88,6 +90,8 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
     private boolean blockWhenFull;
     @UriParam(label = "producer")
     private boolean failIfNoConsumers;
+    @UriParam(label = "producer")
+    private boolean discardIfNoConsumers;
 
     private BlockingQueueFactory<Exchange> queueFactory;
 
@@ -292,6 +296,19 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
         return concurrentConsumers;
     }
 
+    @ManagedAttribute
+    public boolean isLimitConcurrentConsumers() {
+        return limitConcurrentConsumers;
+    }
+
+    /**
+     * Whether to limit the number of concurrentConsumers to the maximum of 
500.
+     * By default, an exception will be thrown if an endpoint is configured 
with a greater number. You can disable that check by turning this option off.
+     */
+    public void setLimitConcurrentConsumers(boolean limitConcurrentConsumers) {
+        this.limitConcurrentConsumers = limitConcurrentConsumers;
+    }
+
     public WaitForTaskToComplete getWaitForTaskToComplete() {
         return waitForTaskToComplete;
     }
@@ -326,13 +343,29 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
     }
 
     /**
-     * Whether the producer should fail by throwing an exception, when sending 
to a SEDA queue with no active consumers.
+     * Whether the producer should fail by throwing an exception, when sending 
to a queue with no active consumers.
+     * <p/>
+     * Only one of the options <tt>discardIfNoConsumers</tt> and 
<tt>failIfNoConsumers</tt> can be enabled at the same time.
      */
     public void setFailIfNoConsumers(boolean failIfNoConsumers) {
         this.failIfNoConsumers = failIfNoConsumers;
     }
 
     @ManagedAttribute
+    public boolean isDiscardIfNoConsumers() {
+        return discardIfNoConsumers;
+    }
+
+    /**
+     * Whether the producer should discard the message (do not add the message 
to the queue), when sending to a queue with no active consumers.
+     * <p/>
+     * Only one of the options <tt>discardIfNoConsumers</tt> and 
<tt>failIfNoConsumers</tt> can be enabled at the same time.
+     */
+    public void setDiscardIfNoConsumers(boolean discardIfNoConsumers) {
+        this.discardIfNoConsumers = discardIfNoConsumers;
+    }
+
+    @ManagedAttribute
     public boolean isMultipleConsumers() {
         return multipleConsumers;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index 56b83f7..a87ddf3 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -216,9 +216,16 @@ public class SedaProducer extends DefaultAsyncProducer {
             throw new SedaConsumerNotAvailableException("No queue available on 
endpoint: " + endpoint, exchange);
         }
 
-        if (endpoint.isFailIfNoConsumers() && !queueReference.hasConsumers()) {
-            throw new SedaConsumerNotAvailableException("No consumers 
available on endpoint: " + endpoint, exchange);
+        boolean empty = !queueReference.hasConsumers();
+        if (empty) {
+            if (endpoint.isFailIfNoConsumers()) {
+                throw new SedaConsumerNotAvailableException("No consumers 
available on endpoint: " + endpoint, exchange);
+            } else if (endpoint.isDiscardIfNoConsumers()) {
+                log.debug("Discard message as no active consumers on endpoint: 
" + endpoint);
+                return;
+            }
         }
+
         if (blockWhenFull) {
             try {
                 queue.put(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
new file mode 100644
index 0000000..630abd4
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaDiscardIfNoConsumerTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class SedaDiscardIfNoConsumerTest extends ContextTestSupport {
+
+    public void testDiscard() throws Exception {
+        SedaEndpoint bar = getMandatoryEndpoint("seda:bar", 
SedaEndpoint.class);
+        assertEquals(0, bar.getCurrentQueueSize());
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(0, bar.getCurrentQueueSize());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:start").to("seda:bar?discardIfNoConsumers=true").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7bfdef25/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java
new file mode 100644
index 0000000..6581d73
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmDiscardIfNoConsumerTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class VmDiscardIfNoConsumerTest extends ContextTestSupport {
+
+    public void testDiscard() throws Exception {
+        VmEndpoint bar = getMandatoryEndpoint("vm:bar", VmEndpoint.class);
+        assertEquals(0, bar.getCurrentQueueSize());
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(0, bar.getCurrentQueueSize());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:start").to("vm:bar?discardIfNoConsumers=true").to("mock:result");
+            }
+        };
+    }
+}

Reply via email to