Author: davsclaus
Date: Fri Apr 24 11:04:48 2009
New Revision: 768258
URL: http://svn.apache.org/viewvc?rev=768258&view=rev
Log:
CAMEL-1562: Routes are now also JMX instrumented. But this time they do not
alter the route at all. So the runtime route model is the same whether JMX is
enabled or not.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
Fri Apr 24 11:04:48 2009
@@ -51,6 +51,7 @@
if (counter != null) {
InstrumentationProcessor wrapper = new
InstrumentationProcessor(counter);
wrapper.setProcessor(target);
+ wrapper.setType(processorDefinition.getShortName());
// remove to not double wrap it
registeredCounters.remove(processorDefinition);
return wrapper;
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
Fri Apr 24 11:04:48 2009
@@ -27,6 +27,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.impl.DefaultCamelContext;
@@ -35,6 +36,7 @@
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.InstrumentationAgent;
+import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ObjectHelper;
@@ -53,11 +55,7 @@
private InstrumentationAgent agent;
private CamelNamingStrategy namingStrategy;
private boolean initialized;
-
- // A map (Endpoint -> InstrumentationProcessor) to facilitate
- // adding per-route interceptor and registering ManagedRoute MBean
- private final Map<Endpoint, InstrumentationProcessor> interceptorMap =
- new HashMap<Endpoint, InstrumentationProcessor>();
+ private final Map<Endpoint, InstrumentationProcessor> registeredRoutes =
new HashMap<Endpoint, InstrumentationProcessor>();
public InstrumentationLifecycleStrategy() {
this(new DefaultInstrumentationAgent());
@@ -165,26 +163,22 @@
return;
}
- // TODO: Disabled for now until we find a better strategy for
registering routes in the JMX
- // without altering the route model. The route model should be much
the same as without JMX to avoid
- // a gap that causes pain to get working with and without JMX enabled.
We have seen to many issues with this already.
-/*
for (Route route : routes) {
try {
ManagedRoute mr = new ManagedRoute(route);
// retrieve the per-route intercept for this route
- InstrumentationProcessor interceptor =
interceptorMap.get(route.getEndpoint());
- if (interceptor == null) {
- LOG.warn("Instrumentation processor not found for route
endpoint: " + route.getEndpoint());
+ InstrumentationProcessor processor =
registeredRoutes.get(route.getEndpoint());
+ if (processor == null) {
+ LOG.warn("Route has not been instrumented for endpoint: "
+ route.getEndpoint());
} else {
- interceptor.setCounter(mr);
+ // let the instrumentation use our route counter
+ processor.setCounter(mr);
}
agent.register(mr, getNamingStrategy().getObjectName(mr));
} catch (JMException e) {
LOG.warn("Could not register Route MBean", e);
}
}
-*/
}
public void onServiceAdd(CamelContext context, Service service) {
@@ -223,6 +217,11 @@
// by InstrumentationInterceptStrategy.
RouteDefinition route = routeContext.getRoute();
+ // TODO: This only registers counters for the first outputs in the
route
+ // all the chidren of the outputs is not registered
+ // we should leverge the Channel for this to ensure we register all
processors
+ // in the entire route graph
+
// register all processors
for (ProcessorDefinition processor : route.getOutputs()) {
ObjectName name = null;
@@ -244,53 +243,33 @@
}
// add intercept strategy that executes the JMX instrumentation for
performance metrics
+ // TODO: We could do as below with an inlined implementation instead
of a separate class
routeContext.addInterceptStrategy(new
InstrumentationInterceptStrategy(registeredCounters));
- // Add an InstrumentationProcessor at the beginning of each route and
- // set up the interceptorMap for onRoutesAdd() method to register the
- // ManagedRoute MBeans.
-
- // TODO: Disabled for now until we find a better strategy for
registering routes in the JMX
- // without altering the route model. The route model should be much
the same as without JMX to avoid
- // a gap that causes pain to get working with and without JMX enabled.
We have seen to many issues with this already.
-
-/* RouteDefinition routeType = routeContext.getRoute();
- if (routeType.getInputs() != null && !routeType.getInputs().isEmpty())
{
- if (routeType.getInputs().size() > 1) {
- LOG.warn("Addding InstrumentationProcessor to first input
only.");
- }
-
- Endpoint endpoint = routeType.getInputs().get(0).getEndpoint();
+ // instrument the route endpoint
+ final Endpoint endpoint = routeContext.getEndpoint();
- List<ProcessorDefinition> exceptionHandlers = new
ArrayList<ProcessorDefinition>();
- List<ProcessorDefinition> outputs = new
ArrayList<ProcessorDefinition>();
+ // only needed to register on the first output as all rotues will pass
through this one
+ ProcessorDefinition out = routeContext.getRoute().getOutputs().get(0);
- // separate out the exception handers in the outputs
- for (ProcessorDefinition output : routeType.getOutputs()) {
- if (output instanceof OnExceptionDefinition) {
- exceptionHandlers.add(output);
- } else {
- outputs.add(output);
+ // add an intercept strategy that counts when the route sends to any
of its outputs
+ out.addInterceptStrategy(new InterceptStrategy() {
+ public Processor wrapProcessorInInterceptors(ProcessorDefinition
processorDefinition, Processor target) throws Exception {
+ if (registeredRoutes.containsKey(endpoint)) {
+ // do not double wrap
+ return target;
}
- }
-
- // clearing the outputs
- routeType.clearOutput();
+ InstrumentationProcessor wrapper = new
InstrumentationProcessor(null);
+ wrapper.setType(processorDefinition.getShortName());
+ wrapper.setProcessor(target);
- // add exception handlers as top children
- routeType.getOutputs().addAll(exceptionHandlers);
+ // register our wrapper
+ registeredRoutes.put(endpoint, wrapper);
- // add an interceptor to instrument the route
- InstrumentationProcessor processor = new
InstrumentationProcessor();
- routeType.intercept(processor);
-
- // add the output
- for (ProcessorDefinition processorType : outputs) {
- routeType.addOutput(processorType);
+ return wrapper;
}
+ });
- interceptorMap.put(endpoint, processor);
- }*/
}
public CamelNamingStrategy getNamingStrategy() {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
Fri Apr 24 11:04:48 2009
@@ -34,6 +34,7 @@
private static final transient Log LOG =
LogFactory.getLog(InstrumentationProcessor.class);
private PerformanceCounter counter;
+ private String type;
public InstrumentationProcessor(PerformanceCounter counter) {
this.counter = counter;
@@ -44,7 +45,7 @@
@Override
public String toString() {
- return "Instrumentation[" + processor + "]";
+ return "Instrumention" + (type != null ? ":" + type : "") + "[" +
processor + "]";
}
public void setCounter(PerformanceCounter counter) {
@@ -102,4 +103,11 @@
}
}
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
Fri Apr 24 11:04:48 2009
@@ -23,6 +23,7 @@
* @version $Revision$
*/
public class NodeFactory {
+
public FilterDefinition createFilter() {
return new FilterDefinition();
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
Fri Apr 24 11:04:48 2009
@@ -53,6 +53,7 @@
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TransactedPolicy;
@@ -70,9 +71,10 @@
private static final transient Log LOG =
LogFactory.getLog(ProcessorDefinition.class);
private ErrorHandlerBuilder errorHandlerBuilder;
private NodeFactory nodeFactory;
- private LinkedList<Block> blocks = new LinkedList<Block>();
+ private final LinkedList<Block> blocks = new LinkedList<Block>();
private ProcessorDefinition parent;
private String errorHandlerRef;
+ private final List<InterceptStrategy> interceptStrategies = new
ArrayList<InterceptStrategy>();
// else to use an optional attribute in JAXB2
public abstract List<ProcessorDefinition> getOutputs();
@@ -130,6 +132,7 @@
// add interceptor strategies to the channel
channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies());
channel.addInterceptStrategies(routeContext.getInterceptStrategies());
+ channel.addInterceptStrategies(this.getInterceptStrategies());
// init the channel
channel.initChannel(this, routeContext);
@@ -2027,6 +2030,15 @@
this.nodeFactory = nodeFactory;
}
+ @XmlTransient
+ public List<InterceptStrategy> getInterceptStrategies() {
+ return interceptStrategies;
+ }
+
+ public void addInterceptStrategy(InterceptStrategy strategy) {
+ this.interceptStrategies.add(strategy);
+ }
+
/**
* Returns a label to describe this node such as the expression if some
kind of expression node
*/
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
Fri Apr 24 11:04:48 2009
@@ -68,8 +68,7 @@
resultEndpoint.assertIsSatisfied();
- // TODO: Routes are temporary disabled until the code in
InstrumentationLifecycleStrategy is fixed
- // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+ verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
Fri Apr 24 11:04:48 2009
@@ -42,7 +42,6 @@
super.tearDown();
}
-
@Override
public void testMBeansRegistered() throws Exception {
if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null
@@ -52,22 +51,17 @@
resolveMandatoryEndpoint("mock:end", MockEndpoint.class);
- Set s = mbsc.queryNames(
- new ObjectName(domainName + ":type=endpoints,*"), null);
+ Set s = mbsc.queryNames(new ObjectName(domainName +
":type=endpoints,*"), null);
assertEquals("Could not find 0 endpoints: " + s, 0, s.size());
- s = mbsc.queryNames(
- new ObjectName(domainName + ":type=contexts,*"), null);
+ s = mbsc.queryNames(new ObjectName(domainName + ":type=contexts,*"),
null);
assertEquals("Could not find 0 context: " + s, 0, s.size());
- s = mbsc.queryNames(
- new ObjectName(domainName + ":type=processors,*"), null);
+ s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"),
null);
assertEquals("Could not find 0 processor: " + s, 0, s.size());
- s = mbsc.queryNames(
- new ObjectName(domainName + ":type=routes,*"), null);
+ s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"),
null);
assertEquals("Could not find 0 route: " + s, 0, s.size());
-
}
@Override
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
Fri Apr 24 11:04:48 2009
@@ -35,7 +35,6 @@
* server to conduct the test as connector server is not enabled by default.
*
* @version $Revision$
- *
*/
public class JmxInstrumentationUsingDefaultsTest extends ContextTestSupport {
@@ -60,10 +59,8 @@
s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"),
null);
assertEquals("Could not find 1 processor: " + s, 1, s.size());
- // TODO: Routes are temporary disalbed until we get the code in
- // InstrumentationLifecycleStrategy fixed
- //s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"),
null);
- //assertEquals("Could not find 1 route: " + s, 1, s.size());
+ s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"),
null);
+ assertEquals("Could not find 1 route: " + s, 1, s.size());
}
public void testCounters() throws Exception {
@@ -73,8 +70,7 @@
resultEndpoint.assertIsSatisfied();
- // TODO: See above
- //verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+ verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
Fri Apr 24 11:04:48 2009
@@ -60,9 +60,8 @@
s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"),
null);
assertEquals("Could not find 2 processor: " + s, 2, s.size());
- // TODO: Routes are temporary disabled until the code in
InstrumentationLifecycleStrategy is fixed
-// s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"),
null);
-// assertEquals("Could not find 1 route: " + s, 1, s.size());
+ s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"),
null);
+ assertEquals("Could not find 1 route: " + s, 1, s.size());
}
@Override
@@ -73,8 +72,7 @@
resultEndpoint.assertIsSatisfied();
- // TODO: see above
- // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+ verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
}
}