Repository: camel Updated Branches: refs/heads/master 1957a8282 -> 0ddf4b1ca
CAMEL-9249: timer - Allow to specify a delay of -1 or something to indicate loop asap forever Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ddf4b1c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ddf4b1c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ddf4b1c Branch: refs/heads/master Commit: 0ddf4b1ca8ca384d498f798e98fd236936f72fd4 Parents: 1957a82 Author: Andrea Cosentino <anco...@gmail.com> Authored: Sat Oct 24 14:25:48 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Sun Oct 25 11:01:38 2015 +0100 ---------------------------------------------------------------------- .../camel/component/timer/TimerConsumer.java | 87 +++++++++++++------- .../camel/component/timer/TimerDelayTest.java | 1 + .../component/timer/TimerNegativeDelayTest.java | 44 ++++++++++ .../TimerNegativeNoRepeatCountDelayTest.java | 57 +++++++++++++ 4 files changed, 159 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java index c11506b..62261cb 100644 --- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.timer; import java.util.Date; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.AsyncCallback; @@ -40,6 +41,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { private final TimerEndpoint endpoint; private volatile TimerTask task; private volatile boolean configured; + private ExecutorService executorService; public TimerConsumer(TimerEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -53,41 +55,60 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { @Override protected void doStart() throws Exception { - task = new TimerTask() { - // counter - private final AtomicLong counter = new AtomicLong(); - - @Override - public void run() { - if (!isTaskRunAllowed()) { - // do not run timer task as it was not allowed - LOG.debug("Run now allowed for timer: {}", endpoint); - return; - } + if (endpoint.getDelay() >= 0) { + task = new TimerTask() { + // counter + private final AtomicLong counter = new AtomicLong(); - try { - long count = counter.incrementAndGet(); + @Override + public void run() { + if (!isTaskRunAllowed()) { + // do not run timer task as it was not allowed + LOG.debug("Run now allowed for timer: {}", endpoint); + return; + } - boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount(); - if (fire) { - sendTimerExchange(count); - } else { - // no need to fire anymore as we exceeded repeat count - LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount()); - cancel(); + try { + long count = counter.incrementAndGet(); + + boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount(); + if (fire) { + sendTimerExchange(count); + } else { + // no need to fire anymore as we exceeded repeat + // count + LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount()); + cancel(); + } + } catch (Throwable e) { + // catch all to avoid the JVM closing the thread and not + // firing again + LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e); } - } catch (Throwable e) { - // catch all to avoid the JVM closing the thread and not firing again - LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e); } + }; + + // only configure task if CamelContext already started, otherwise + // the StartupListener + // is configuring the task later + if (!configured && endpoint.getCamelContext().getStatus().isStarted()) { + Timer timer = endpoint.getTimer(this); + configureTask(task, timer); } - }; + } else { + // if the delay is negative then we use an ExecutorService and fire messages as soon as possible + executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri()); - // only configure task if CamelContext already started, otherwise the StartupListener - // is configuring the task later - if (!configured && endpoint.getCamelContext().getStatus().isStarted()) { - Timer timer = endpoint.getTimer(this); - configureTask(task, timer); + executorService.execute(new Runnable() { + public void run() { + final AtomicLong counter = new AtomicLong(); + long count = counter.incrementAndGet(); + while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) { + sendTimerExchange(count); + count = counter.incrementAndGet(); + } + } + }); } } @@ -101,6 +122,12 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { // remove timer endpoint.removeTimer(this); + + // if executorService is instantiated then we shutdown it + if (executorService != null) { + endpoint.getCamelContext().getExecutorServiceManager().shutdown(executorService); + executorService = null; + } } @Override @@ -108,7 +135,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { if (task != null && !configured) { Timer timer = endpoint.getTimer(this); configureTask(task, timer); - } + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java index 6a822d8..9f534be 100644 --- a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java @@ -38,6 +38,7 @@ public class TimerDelayTest extends ContextTestSupport { @Override public void configure() throws Exception { from("timer://foo?delay=500&period=0").to("mock:result"); + } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java new file mode 100644 index 0000000..86d4f27 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.timer; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class TimerNegativeDelayTest extends ContextTestSupport { + + public void testDelay() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(10); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer://foo?delay=-1&period=0&repeatCount=10").to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java new file mode 100644 index 0000000..be00f13 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.timer; + +import java.util.Iterator; +import java.util.List; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class TimerNegativeNoRepeatCountDelayTest extends ContextTestSupport { + + public void testNegativeDelay() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + List<Exchange> exchanges = mock.getExchanges(); + + context.stopRoute("routeTest"); + + Iterator<Exchange> iter = exchanges.iterator(); + + while (iter.hasNext()) { + Exchange exchange = (Exchange) iter.next(); + assertEquals("negativeDelay", exchange.getProperty(Exchange.TIMER_NAME)); + assertNotNull(exchange.getProperty(Exchange.TIMER_FIRED_TIME)); + assertNotNull(exchange.getIn().getHeader("firedTime")); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("timer://negativeDelay?delay=-1").routeId("routeTest").to("mock:result"); + } + }; + } +}