Author: davsclaus
Date: Thu Sep 18 02:15:45 2008
New Revision: 696610
URL: http://svn.apache.org/viewvc?rev=696610&view=rev
Log:
CAMEL-640: Added Delayer (interceptor) to slow processing of messages down so
you can watch what happens nice and slowly.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
(contents, props changed)
- copied, changed from r696361,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
(contents, props changed)
- copied, changed from r696567,
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TraceFormatterTest.java
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml
- copied, changed from r696567,
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/traceFormatterTest.xml
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/ExchangeFormatter.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=696610&r1=696609&r2=696610&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Thu Sep 18 02:15:45 2008
@@ -44,10 +44,10 @@
import org.apache.camel.management.JmxSystemPropertyKeys;
import org.apache.camel.model.RouteType;
import org.apache.camel.model.dataformat.DataFormatType;
+import org.apache.camel.processor.interceptor.Delayer;
import org.apache.camel.processor.interceptor.TraceFormatter;
import org.apache.camel.processor.interceptor.Tracer;
import org.apache.camel.spi.ComponentResolver;
-import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.ExchangeConverter;
import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptStrategy;
@@ -93,6 +93,7 @@
private List<RouteType> routeDefinitions = new ArrayList<RouteType>();
private List<InterceptStrategy> interceptStrategies = new
ArrayList<InterceptStrategy>();
private Boolean trace;
+ private Long delay;
private ErrorHandlerBuilder errorHandlerBuilder;
private Map<String, DataFormatType> dataFormats = new HashMap<String,
DataFormatType>();
@@ -504,6 +505,28 @@
this.trace = trace;
}
+ /**
+ * Returns the delay in millis if delaying has been enabled or disabled
via the [EMAIL PROTECTED] #setDelay(Long)} method
+ * or it has not been specified then default to the <b>camel.delay</b>
system property
+ */
+ public long getDelay() {
+ final Long value = getDelaying();
+ if (value != null) {
+ return value;
+ } else {
+ String prop = SystemHelper.getSystemProperty("camel.delay");
+ return prop != null ? Long.getLong(prop) : 0;
+ }
+ }
+
+ public Long getDelaying() {
+ return delay;
+ }
+
+ public void setDelay(Long delay) {
+ this.delay = delay;
+ }
+
public <E extends Exchange> ProducerTemplate<E> createProducerTemplate() {
return new DefaultProducerTemplate<E>(this);
}
@@ -524,15 +547,8 @@
protected void doStart() throws Exception {
if (getTrace()) {
- // lets check if we already have already been configured and if
not add the default
- boolean found = false;
- final List<InterceptStrategy> list = getInterceptStrategies();
- for (InterceptStrategy strategy : list) {
- if (strategy instanceof Tracer) {
- found = true;
- }
- }
- if (!found) {
+ // only add a new tracer if not already configued
+ if (Tracer.getTracer(this) == null) {
Tracer tracer = new Tracer();
// lets see if we have a formatter if so use it
TraceFormatter formatter =
this.getRegistry().lookup("traceFormatter", TraceFormatter.class);
@@ -542,6 +558,14 @@
addInterceptStrategy(tracer);
}
}
+
+ if (getDelay() > 0) {
+ // only add a new delayer if not already configued
+ if (Delayer.getDelayer(this) == null) {
+ addInterceptStrategy(new Delayer(getDelay()));
+ }
+ }
+
lifecycleStrategy.onContextStart(this);
forceLazyInitialization();
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=696610&r1=696609&r2=696610&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Thu Sep 18 02:15:45 2008
@@ -94,8 +94,8 @@
if (delay <= 0) {
return;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for: " + delay + " millis");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sleeping for: " + delay + " millis");
}
if (isFastStop()) {
stoppedLatch.await(delay, TimeUnit.MILLISECONDS);
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java?rev=696610&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
Thu Sep 18 02:15:45 2008
@@ -0,0 +1,34 @@
+package org.apache.camel.processor.interceptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.processor.DelayProcessorSupport;
+
+/**
+ * An interceptor for delaying routes.
+ */
+public class DelayInterceptor extends DelayProcessorSupport {
+
+ private final ProcessorType node;
+ private Delayer delayer;
+
+ public DelayInterceptor(ProcessorType node, Processor target, Delayer
delayer) {
+ super(target);
+ this.node = node;
+ this.delayer = delayer;
+ }
+
+ @Override
+ public String toString() {
+ return "DelayInterceptor[delay: " + delayer.getDelay() + " on: " +
node + "]";
+ }
+
+ public void delay(Exchange exchange) throws Exception {
+ if (delayer.isEnabled()) {
+ long time = currentSystemTime() + delayer.getDelay();
+ waitUntil(time, exchange);
+ }
+ }
+
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DelayInterceptor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java?rev=696610&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
Thu Sep 18 02:15:45 2008
@@ -0,0 +1,66 @@
+package org.apache.camel.processor.interceptor;
+
+import java.util.List;
+
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.impl.DefaultCamelContext;
+
+/**
+ * An interceptor strategy for delaying routes.
+ */
+public class Delayer implements InterceptStrategy {
+
+ private boolean enabled = true;
+ private long delay;
+
+ public Delayer() {
+ }
+
+ public Delayer(long delay) {
+ this.delay = delay;
+ }
+
+ /**
+ * A helper method to return the Delayer instance for a given [EMAIL
PROTECTED] org.apache.camel.CamelContext} if one is enabled
+ *
+ * @param context the camel context the delayer is connected to
+ * @return the delayer or null if none can be found
+ */
+ public static DelayInterceptor getDelayer(CamelContext context) {
+ if (context instanceof DefaultCamelContext) {
+ DefaultCamelContext defaultCamelContext = (DefaultCamelContext)
context;
+ List<InterceptStrategy> list =
defaultCamelContext.getInterceptStrategies();
+ for (InterceptStrategy interceptStrategy : list) {
+ if (interceptStrategy instanceof DelayInterceptor) {
+ return (DelayInterceptor)interceptStrategy;
+ }
+ }
+ }
+ return null;
+ }
+
+ public Processor wrapProcessorInInterceptors(ProcessorType processorType,
Processor target)
+ throws Exception {
+ DelayInterceptor delayer = new DelayInterceptor(processorType, target,
this);
+ return delayer;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/ExchangeFormatter.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/ExchangeFormatter.java?rev=696610&r1=696609&r2=696610&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/ExchangeFormatter.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/ExchangeFormatter.java
Thu Sep 18 02:15:45 2008
@@ -25,6 +25,7 @@
* @version $Revision$
*/
public interface ExchangeFormatter {
+
/**
* Generates a string representation of the exchange
*/
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java?rev=696610&r1=696609&r2=696610&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
Thu Sep 18 02:15:45 2008
@@ -37,8 +37,8 @@
/**
* A helper method to return the Tracer instance for a given [EMAIL
PROTECTED] CamelContext} if one is enabled
*
- * @param context the camel context the debugger is connected to
- * @return the debugger or null if none can be found
+ * @param context the camel context the tracer is connected to
+ * @return the tracer or null if none can be found
*/
public static Tracer getTracer(CamelContext context) {
if (context instanceof DefaultCamelContext) {
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
(from r696361,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java&r1=696361&r2=696610&rev=696610&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TraceInterceptorTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
Thu Sep 18 02:15:45 2008
@@ -20,43 +20,50 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.interceptor.Debugger;
-import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.processor.interceptor.Delayer;
/**
+ * Delay interceptor unit test.
+ *
* @version $Revision$
*/
-public class TraceInterceptorTest extends ContextTestSupport {
+public class DelayInterceptorTest extends ContextTestSupport {
- // START SNIPPET: e1
public void testSendingSomeMessages() throws Exception {
- template.sendBodyAndHeader("direct:start", "Hello London", "to",
"James");
- template.sendBodyAndHeader("direct:start", "This is Copenhagen
calling", "from", "Claus");
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("direct:start", "Message #" + i);
+ }
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should be slower to run: " + delta, delta > 4000);
+ assertTrue("Should not take that long to run: " + delta, delta < 7000);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ disableJMX();
+ super.setUp();
}
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
+ // START SNIPPET: e1
public void configure() throws Exception {
- // add tracer as an interceptor so it will log the exchange
executions at runtime
- // this can aid us to understand/see how the exchanges is
routed etc.
- getContext().addInterceptStrategy(new Tracer());
+ // add the delay interceptor to delay each step 200 millis
+ getContext().addInterceptStrategy(new Delayer(200));
+
+ // regular routes here
+ // END SNIPPET: e1
from("direct:start").
process(new Processor() {
public void process(Exchange exchange) throws
Exception {
// do nothing
}
-
- @Override
- public String toString() {
- return "MyProcessor";
- }
}).
- to("mock:a").
- to("mock:b");
+ to("mock:result");
}
};
}
- // END SNIPPET: e1
-}
+}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DelayInterceptorTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified:
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=696610&r1=696609&r2=696610&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
(original)
+++
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Thu Sep 18 02:15:45 2008
@@ -43,6 +43,7 @@
import org.apache.camel.model.RouteType;
import org.apache.camel.model.dataformat.DataFormatsType;
import org.apache.camel.processor.interceptor.Debugger;
+import org.apache.camel.processor.interceptor.Delayer;
import org.apache.camel.processor.interceptor.TraceFormatter;
import org.apache.camel.processor.interceptor.Tracer;
import org.apache.camel.spi.LifecycleStrategy;
@@ -82,6 +83,8 @@
@XmlAttribute(required = false)
private Boolean trace;
@XmlAttribute(required = false)
+ private Long delay;
+ @XmlAttribute(required = false)
private String errorHandlerRef;
@XmlElement(name = "package", required = false)
private String[] packages = {};
@@ -134,12 +137,13 @@
}
public void afterPropertiesSet() throws Exception {
- // lets see if we can find a debugger to add
// TODO there should be a neater way to do this!
+
Debugger debugger = getBeanForType(Debugger.class);
if (debugger != null) {
getContext().addInterceptStrategy(debugger);
}
+
Tracer tracer = getBeanForType(Tracer.class);
if (tracer != null) {
// use formatter if there is a TraceFormatter bean defined
@@ -150,6 +154,11 @@
getContext().addInterceptStrategy(tracer);
}
+ Delayer delayer = getBeanForType(Delayer.class);
+ if (delayer != null) {
+ getContext().addInterceptStrategy(delayer);
+ }
+
// set the lifecycle strategy if defined
LifecycleStrategy lifecycleStrategy =
getBeanForType(LifecycleStrategy.class);
if (lifecycleStrategy != null) {
@@ -408,6 +417,14 @@
this.trace = trace;
}
+ public Long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(Long delay) {
+ this.delay = delay;
+ }
+
public CamelJMXAgentType getCamelJMXAgent() {
return camelJMXAgent;
}
@@ -454,6 +471,9 @@
if (trace != null) {
ctx.setTrace(trace);
}
+ if (delay != null) {
+ ctx.setDelay(delay);
+ }
if (errorHandlerRef != null) {
ErrorHandlerBuilder errorHandlerBuilder = (ErrorHandlerBuilder)
getApplicationContext().getBean(errorHandlerRef, ErrorHandlerBuilder.class);
if (errorHandlerBuilder == null) {
Copied:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
(from r696567,
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TraceFormatterTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java?p2=activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java&p1=activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TraceFormatterTest.java&r1=696567&r2=696610&rev=696610&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TraceFormatterTest.java
(original)
+++
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
Thu Sep 18 02:15:45 2008
@@ -22,22 +22,29 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
- * Unit test for trace formatter configurd in spring XML.
+ * Unit test for delayer interceptor configurd in spring XML.
*/
-public class TraceFormatterTest extends SpringTestSupport {
+public class DelayerInterceptorTest extends SpringTestSupport {
protected AbstractXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext(
- "/org/apache/camel/spring/interceptor/traceFormatterTest.xml");
+
"/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml");
}
- public void testTraceFormatter() throws Exception {
+ public void testDelayer() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(1);
+ mock.expectedMessageCount(10);
- template.sendBody("direct:start", "Hello World");
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++) {
+ template.sendBody("direct:start", "Message #" + i);
+ }
+ long delta = System.currentTimeMillis() - start;
assertMockEndpointsSatisfied();
+
+ assertTrue("Should be slower to run: " + delta, delta > 4000);
+ assertTrue("Should not take that long to run: " + delta, delta < 7000);
}
-
-}
+
+}
\ No newline at end of file
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/DelayerInterceptorTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Copied:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml
(from r696567,
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/traceFormatterTest.xml)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml?p2=activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml&p1=activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/traceFormatterTest.xml&r1=696567&r2=696610&rev=696610&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/traceFormatterTest.xml
(original)
+++
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/interceptor/delayerInterceptorTest.xml
Thu Sep 18 02:15:45 2008
@@ -22,19 +22,11 @@
http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
">
- <!-- SNIPPET START: e1 -->
- <bean id="traceFormatter"
class="org.apache.camel.processor.interceptor.TraceFormatter">
- <property name="showBody" value="true"/>
- <property name="showBodyType" value="false"/>
- <property name="showBreadCrumb" value="false"/>
- </bean>
-
- <camelContext id="camel" trace="true"
xmlns="http://activemq.apache.org/camel/schema/spring">
+ <camelContext id="camel" delay="200"
xmlns="http://activemq.apache.org/camel/schema/spring">
<route>
<from uri="direct:start"/>
<to uri="mock:result"/>
</route>
</camelContext>
- <!-- SNIPPET END: e1 -->
</beans>