Author: davsclaus
Date: Wed Oct 19 08:56:12 2011
New Revision: 1186037
URL: http://svn.apache.org/viewvc?rev=1186037&view=rev
Log:
CAMEL-4508: Added option sendEmptyMessageWhenIdle to scheduled poll consumer.
Thanks to Rich Newcomb for the patch.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1186037&r1=1186036&r2=1186037&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Wed Oct 19 08:56:12 2011
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFut
import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
@@ -50,6 +51,7 @@ public abstract class ScheduledPollConsu
private boolean useFixedDelay = true;
private PollingConsumerPollStrategy pollStrategy = new
DefaultPollingConsumerPollStrategy();
private LoggingLevel runLoggingLevel = LoggingLevel.TRACE;
+ private boolean sendEmptyMessageWhenIdle;
public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -132,6 +134,12 @@ public abstract class ScheduledPollConsu
if (begin) {
retryCounter++;
int polledMessages = poll();
+
+ if (polledMessages == 0 &&
isSendEmptyMessageWhenIdle()) {
+ // send an "empty" exchange
+ processEmptyMessage();
+ }
+
pollStrategy.commit(this, getEndpoint(),
polledMessages);
} else {
LOG.debug("Cannot begin polling as pollStrategy
returned false: {}", pollStrategy);
@@ -164,6 +172,17 @@ public abstract class ScheduledPollConsu
// avoid this thread to throw exceptions because the thread pool wont
re-schedule a new thread
}
+ /**
+ * No messages to poll so send an empty message instead.
+ *
+ * @throws Exception is thrown if error processing the empty message.
+ */
+ protected void processEmptyMessage() throws Exception {
+ Exchange exchange = getEndpoint().createExchange();
+ log.debug("Sending empty message as there were no messages from
polling: {}", this.getEndpoint());
+ getProcessor().process(exchange);
+ }
+
// Properties
//
-------------------------------------------------------------------------
@@ -242,6 +261,14 @@ public abstract class ScheduledPollConsu
public void setStartScheduler(boolean startScheduler) {
this.startScheduler = startScheduler;
}
+
+ public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
+ this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
+ }
+
+ public boolean isSendEmptyMessageWhenIdle() {
+ return sendEmptyMessageWhenIdle;
+ }
// Implementation methods
//
-------------------------------------------------------------------------
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1186037&r1=1186036&r2=1186037&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Wed Oct 19 08:56:12 2011
@@ -95,8 +95,19 @@ public abstract class ScheduledPollEndpo
Object useFixedDelay = options.remove("useFixedDelay");
Object pollStrategy = options.remove("pollStrategy");
Object runLoggingLevel = options.remove("runLoggingLevel");
- if (initialDelay != null || delay != null || timeUnit != null ||
useFixedDelay != null || pollStrategy != null
- || runLoggingLevel != null || startScheduler != null) {
+ Object sendEmptyMessageWhenIdle =
options.remove("sendEmptyMessageWhenIdle");
+ boolean setConsumerProperties = false;
+
+ // the following is split into two if statements to satisfy the
checkstyle max complexity constraint
+ if (initialDelay != null || delay != null || timeUnit != null ||
useFixedDelay != null || pollStrategy != null) {
+ setConsumerProperties = true;
+ }
+ if (runLoggingLevel != null || startScheduler != null ||
sendEmptyMessageWhenIdle != null) {
+ setConsumerProperties = true;
+ }
+
+ if (setConsumerProperties) {
+
if (consumerProperties == null) {
consumerProperties = new HashMap<String, Object>();
}
@@ -121,6 +132,10 @@ public abstract class ScheduledPollEndpo
if (runLoggingLevel != null) {
consumerProperties.put("runLoggingLevel", runLoggingLevel);
}
+ if (sendEmptyMessageWhenIdle != null) {
+ consumerProperties.put("sendEmptyMessageWhenIdle",
sendEmptyMessageWhenIdle);
+ }
}
}
+
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java?rev=1186037&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdleMessageTest.java
Wed Oct 19 08:56:12 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Test to verify that the polling consumer delivers an empty Exchange when the
+ * sendEmptyMessageWhenIdle property is set and a polling event yields no
results.
+ */
+public class FileConsumerIdleMessageTest extends ContextTestSupport {
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+
from("file://target/empty?delay=50&sendEmptyMessageWhenIdle=true").convertBodyTo(String.class).
+ to("mock:result");
+ }
+ };
+ }
+
+ public void testConsumeIdleMessages() throws Exception {
+ Thread.sleep(110);
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(2);
+ assertMockEndpointsSatisfied();
+ assertTrue(mock.getExchanges().get(0).getIn().getBody() == null);
+ assertTrue(mock.getExchanges().get(1).getIn().getBody() == null);
+ }
+
+}