Author: davsclaus
Date: Wed Oct 19 08:56:12 2011
New Revision: 1186037

URL: http://svn.apache.org/viewvc?rev=1186037&view=rev
Log:
CAMEL-4508: Added option sendEmptyMessageWhenIdle to scheduled poll consumer. 
Thanks to Rich Newcomb for the patch.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1186037&r1=1186036&r2=1186037&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 Wed Oct 19 08:56:12 2011
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumerPollingStrategy;
 import org.apache.camel.Processor;
@@ -50,6 +51,7 @@ public abstract class ScheduledPollConsu
     private boolean useFixedDelay = true;
     private PollingConsumerPollStrategy pollStrategy = new 
DefaultPollingConsumerPollStrategy();
     private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
+    private boolean sendEmptyMessageWhenIdle;
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -132,6 +134,12 @@ public abstract class ScheduledPollConsu
                     if (begin) {
                         retryCounter++;
                         int polledMessages = poll();
+                        
+                        if (polledMessages == 0 && 
isSendEmptyMessageWhenIdle()) {
+                            // send an "empty" exchange
+                            processEmptyMessage();
+                        }
+                        
                         pollStrategy.commit(this, getEndpoint(), 
polledMessages);
                     } else {
                         LOG.debug("Cannot begin polling as pollStrategy 
returned false: {}", pollStrategy);
@@ -164,6 +172,17 @@ public abstract class ScheduledPollConsu
         // avoid this thread to throw exceptions because the thread pool wont 
re-schedule a new thread
     }
 
+    /**
+     * No messages to poll so send an empty message instead.
+     *
+     * @throws Exception is thrown if error processing the empty message.
+     */
+    protected void processEmptyMessage() throws Exception {
+        Exchange exchange = getEndpoint().createExchange();
+        log.debug("Sending empty message as there were no messages from 
polling: {}", this.getEndpoint());
+        getProcessor().process(exchange);
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -242,6 +261,14 @@ public abstract class ScheduledPollConsu
     public void setStartScheduler(boolean startScheduler) {
         this.startScheduler = startScheduler;
     }
+    
+    public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
+        this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
+    }
+    
+    public boolean isSendEmptyMessageWhenIdle() {
+        return sendEmptyMessageWhenIdle;
+    }
 
     // Implementation methods
     // 
-------------------------------------------------------------------------

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1186037&r1=1186036&r2=1186037&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
 Wed Oct 19 08:56:12 2011
@@ -95,8 +95,19 @@ public abstract class ScheduledPollEndpo
         Object useFixedDelay = options.remove("useFixedDelay");
         Object pollStrategy = options.remove("pollStrategy");
         Object runLoggingLevel = options.remove("runLoggingLevel");
-        if (initialDelay != null || delay != null || timeUnit != null || 
useFixedDelay != null || pollStrategy != null
-                || runLoggingLevel != null || startScheduler != null) {
+        Object sendEmptyMessageWhenIdle = 
options.remove("sendEmptyMessageWhenIdle");
+        boolean setConsumerProperties = false;
+        
+        // the following is split into two if statements to satisfy the 
checkstyle max complexity constraint
+        if (initialDelay != null || delay != null || timeUnit != null || 
useFixedDelay != null || pollStrategy != null) {
+            setConsumerProperties = true;
+        }
+        if (runLoggingLevel != null || startScheduler != null || 
sendEmptyMessageWhenIdle != null) {
+            setConsumerProperties = true;
+        }
+        
+        if (setConsumerProperties) {
+        
             if (consumerProperties == null) {
                 consumerProperties = new HashMap<String, Object>();
             }
@@ -121,6 +132,10 @@ public abstract class ScheduledPollEndpo
             if (runLoggingLevel != null) {
                 consumerProperties.put("runLoggingLevel", runLoggingLevel);
             }
+            if (sendEmptyMessageWhenIdle != null) {
+                consumerProperties.put("sendEmptyMessageWhenIdle", 
sendEmptyMessageWhenIdle);
+            }
         }
     }
+    
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java?rev=1186037&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
 Wed Oct 19 08:56:12 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Test to verify that the polling consumer delivers an empty Exchange when the
+ * sendEmptyMessageWhenIdle property is set and a polling event yields no 
results.
+ */
+public class FileConsumerIdleMessageTest extends ContextTestSupport {
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                
from("file://target/empty?delay=50&sendEmptyMessageWhenIdle=true").convertBodyTo(String.class).
+                    to("mock:result");
+            }
+        };
+    }
+
+    public void testConsumeIdleMessages() throws Exception {
+        Thread.sleep(110);
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(2);
+        assertMockEndpointsSatisfied();
+        assertTrue(mock.getExchanges().get(0).getIn().getBody() == null);
+        assertTrue(mock.getExchanges().get(1).getIn().getBody() == null);
+    }
+
+}


Reply via email to