Author: davsclaus
Date: Fri Apr 24 11:04:48 2009
New Revision: 768258

URL: http://svn.apache.org/viewvc?rev=768258&view=rev
Log:
CAMEL-1562: Routes are now also JMX instrumented. But this time they do not 
alter the route at all. So the runtime route model is the same whether JMX is 
enabled or not. 

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
 Fri Apr 24 11:04:48 2009
@@ -51,6 +51,7 @@
         if (counter != null) {
             InstrumentationProcessor wrapper = new 
InstrumentationProcessor(counter);
             wrapper.setProcessor(target);
+            wrapper.setType(processorDefinition.getShortName());
             // remove to not double wrap it
             registeredCounters.remove(processorDefinition);
             return wrapper;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationLifecycleStrategy.java
 Fri Apr 24 11:04:48 2009
@@ -27,6 +27,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.Service;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -35,6 +36,7 @@
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.InstrumentationAgent;
+import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ObjectHelper;
@@ -53,11 +55,7 @@
     private InstrumentationAgent agent;
     private CamelNamingStrategy namingStrategy;
     private boolean initialized;
-
-    // A map (Endpoint -> InstrumentationProcessor) to facilitate
-    // adding per-route interceptor and registering ManagedRoute MBean
-    private final Map<Endpoint, InstrumentationProcessor> interceptorMap =
-        new HashMap<Endpoint, InstrumentationProcessor>();
+    private final Map<Endpoint, InstrumentationProcessor> registeredRoutes = 
new HashMap<Endpoint, InstrumentationProcessor>();
 
     public InstrumentationLifecycleStrategy() {
         this(new DefaultInstrumentationAgent());
@@ -165,26 +163,22 @@
             return;
         }
 
-        // TODO: Disabled for now until we find a better strategy for 
registering routes in the JMX
-        // without altering the route model. The route model should be much 
the same as without JMX to avoid
-        // a gap that causes pain to get working with and without JMX enabled. 
We have seen to many issues with this already.
-/*
         for (Route route : routes) {
             try {
                 ManagedRoute mr = new ManagedRoute(route);
                 // retrieve the per-route intercept for this route
-                InstrumentationProcessor interceptor = 
interceptorMap.get(route.getEndpoint());
-                if (interceptor == null) {
-                    LOG.warn("Instrumentation processor not found for route 
endpoint: " + route.getEndpoint());
+                InstrumentationProcessor processor = 
registeredRoutes.get(route.getEndpoint());
+                if (processor == null) {
+                    LOG.warn("Route has not been instrumented for endpoint: " 
+ route.getEndpoint());
                 } else {
-                    interceptor.setCounter(mr);
+                    // let the instrumentation use our route counter
+                    processor.setCounter(mr);
                 }
                 agent.register(mr, getNamingStrategy().getObjectName(mr));
             } catch (JMException e) {
                 LOG.warn("Could not register Route MBean", e);
             }
         }
-*/
     }
 
     public void onServiceAdd(CamelContext context, Service service) {
@@ -223,6 +217,11 @@
         // by InstrumentationInterceptStrategy.
         RouteDefinition route = routeContext.getRoute();
 
+        // TODO: This only registers counters for the first outputs in the 
route
+        // all the chidren of the outputs is not registered
+        // we should leverge the Channel for this to ensure we register all 
processors
+        // in the entire route graph
+
         // register all processors
         for (ProcessorDefinition processor : route.getOutputs()) {
             ObjectName name = null;
@@ -244,53 +243,33 @@
         }
 
         // add intercept strategy that executes the JMX instrumentation for 
performance metrics
+        // TODO: We could do as below with an inlined implementation instead 
of a separate class
         routeContext.addInterceptStrategy(new 
InstrumentationInterceptStrategy(registeredCounters));
 
-        // Add an InstrumentationProcessor at the beginning of each route and
-        // set up the interceptorMap for onRoutesAdd() method to register the
-        // ManagedRoute MBeans.
-
-        // TODO: Disabled for now until we find a better strategy for 
registering routes in the JMX
-        // without altering the route model. The route model should be much 
the same as without JMX to avoid
-        // a gap that causes pain to get working with and without JMX enabled. 
We have seen to many issues with this already.
-
-/*        RouteDefinition routeType = routeContext.getRoute();
-        if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) 
{
-            if (routeType.getInputs().size() > 1) {
-                LOG.warn("Addding InstrumentationProcessor to first input 
only.");
-            }
-
-            Endpoint endpoint  = routeType.getInputs().get(0).getEndpoint();
+        // instrument the route endpoint
+        final Endpoint endpoint = routeContext.getEndpoint();
 
-            List<ProcessorDefinition> exceptionHandlers = new 
ArrayList<ProcessorDefinition>();
-            List<ProcessorDefinition> outputs = new 
ArrayList<ProcessorDefinition>();
+        // only needed to register on the first output as all rotues will pass 
through this one
+        ProcessorDefinition out = routeContext.getRoute().getOutputs().get(0);
 
-            // separate out the exception handers in the outputs
-            for (ProcessorDefinition output : routeType.getOutputs()) {
-                if (output instanceof OnExceptionDefinition) {
-                    exceptionHandlers.add(output);
-                } else {
-                    outputs.add(output);
+        // add an intercept strategy that counts when the route sends to any 
of its outputs
+        out.addInterceptStrategy(new InterceptStrategy() {
+            public Processor wrapProcessorInInterceptors(ProcessorDefinition 
processorDefinition, Processor target) throws Exception {
+                if (registeredRoutes.containsKey(endpoint)) {
+                    // do not double wrap
+                    return target;
                 }
-            }
-
-            // clearing the outputs
-            routeType.clearOutput();
+                InstrumentationProcessor wrapper = new 
InstrumentationProcessor(null);
+                wrapper.setType(processorDefinition.getShortName());
+                wrapper.setProcessor(target);
 
-            // add exception handlers as top children
-            routeType.getOutputs().addAll(exceptionHandlers);
+                // register our wrapper
+                registeredRoutes.put(endpoint, wrapper);
 
-            // add an interceptor to instrument the route
-            InstrumentationProcessor processor = new 
InstrumentationProcessor();
-            routeType.intercept(processor);
-
-            // add the output
-            for (ProcessorDefinition processorType : outputs) {
-                routeType.addOutput(processorType);
+                return wrapper;
             }
+        });
 
-            interceptorMap.put(endpoint, processor);
-        }*/
     }
 
     public CamelNamingStrategy getNamingStrategy() {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 Fri Apr 24 11:04:48 2009
@@ -34,6 +34,7 @@
 
     private static final transient Log LOG = 
LogFactory.getLog(InstrumentationProcessor.class);
     private PerformanceCounter counter;
+    private String type;
 
     public InstrumentationProcessor(PerformanceCounter counter) {
         this.counter = counter;
@@ -44,7 +45,7 @@
     
     @Override
     public String toString() {
-        return "Instrumentation[" + processor + "]";
+        return "Instrumention" + (type != null ? ":" + type : "") + "[" + 
processor + "]";
     }
 
     public void setCounter(PerformanceCounter counter) {
@@ -102,4 +103,11 @@
         }
     }
 
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/NodeFactory.java 
Fri Apr 24 11:04:48 2009
@@ -23,6 +23,7 @@
  * @version $Revision$
  */
 public class NodeFactory {
+
     public FilterDefinition createFilter() {
         return new FilterDefinition();
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Fri Apr 24 11:04:48 2009
@@ -53,6 +53,7 @@
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TransactedPolicy;
@@ -70,9 +71,10 @@
     private static final transient Log LOG = 
LogFactory.getLog(ProcessorDefinition.class);
     private ErrorHandlerBuilder errorHandlerBuilder;
     private NodeFactory nodeFactory;
-    private LinkedList<Block> blocks = new LinkedList<Block>();
+    private final LinkedList<Block> blocks = new LinkedList<Block>();
     private ProcessorDefinition parent;
     private String errorHandlerRef;
+    private final List<InterceptStrategy> interceptStrategies = new 
ArrayList<InterceptStrategy>();
 
     // else to use an optional attribute in JAXB2
     public abstract List<ProcessorDefinition> getOutputs();
@@ -130,6 +132,7 @@
         // add interceptor strategies to the channel
         
channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies());
         channel.addInterceptStrategies(routeContext.getInterceptStrategies());
+        channel.addInterceptStrategies(this.getInterceptStrategies());
 
         // init the channel
         channel.initChannel(this, routeContext);
@@ -2027,6 +2030,15 @@
         this.nodeFactory = nodeFactory;
     }
 
+    @XmlTransient
+    public List<InterceptStrategy> getInterceptStrategies() {
+        return interceptStrategies;
+    }
+
+    public void addInterceptStrategy(InterceptStrategy strategy) {
+        this.interceptStrategies.add(strategy);
+    }
+
     /**
      * Returns a label to describe this node such as the expression if some 
kind of expression node
      */

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationCustomMBeanTest.java
 Fri Apr 24 11:04:48 2009
@@ -68,8 +68,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: Routes are temporary disabled until the code in 
InstrumentationLifecycleStrategy is fixed
-        // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
         verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationDisableTest.java
 Fri Apr 24 11:04:48 2009
@@ -42,7 +42,6 @@
         super.tearDown();
     }
 
-
     @Override
     public void testMBeansRegistered() throws Exception {
         if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null
@@ -52,22 +51,17 @@
 
         resolveMandatoryEndpoint("mock:end", MockEndpoint.class);
 
-        Set s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=endpoints,*"), null);
+        Set s = mbsc.queryNames(new ObjectName(domainName + 
":type=endpoints,*"), null);
         assertEquals("Could not find 0 endpoints: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=contexts,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=contexts,*"), 
null);
         assertEquals("Could not find 0 context: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=processors,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), 
null);
         assertEquals("Could not find 0 processor: " + s, 0, s.size());
 
-        s = mbsc.queryNames(
-                new ObjectName(domainName + ":type=routes,*"), null);
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
         assertEquals("Could not find 0 route: " + s, 0, s.size());
-
     }
 
     @Override

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/JmxInstrumentationUsingDefaultsTest.java
 Fri Apr 24 11:04:48 2009
@@ -35,7 +35,6 @@
  * server to conduct the test as connector server is not enabled by default.
  *
  * @version $Revision$
- *
  */
 public class JmxInstrumentationUsingDefaultsTest extends ContextTestSupport {
 
@@ -60,10 +59,8 @@
         s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), 
null);
         assertEquals("Could not find 1 processor: " + s, 1, s.size());
 
-        // TODO: Routes are temporary disalbed until we get the code in
-        // InstrumentationLifecycleStrategy fixed
-        //s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
-        //assertEquals("Could not find 1 route: " + s, 1, s.size());
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
+        assertEquals("Could not find 1 route: " + s, 1, s.size());
     }
 
     public void testCounters() throws Exception {
@@ -73,8 +70,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: See above
-        //verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
         verifyCounter(mbsc, new ObjectName(domainName + ":type=processors,*"));
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java?rev=768258&r1=768257&r2=768258&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/MultiInstanceProcessorTest.java
 Fri Apr 24 11:04:48 2009
@@ -60,9 +60,8 @@
         s = mbsc.queryNames(new ObjectName(domainName + ":type=processors,*"), 
null);
         assertEquals("Could not find 2 processor: " + s, 2, s.size());
 
-        // TODO: Routes are temporary disabled until the code in 
InstrumentationLifecycleStrategy is fixed
-//        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
-//        assertEquals("Could not find 1 route: " + s, 1, s.size());
+        s = mbsc.queryNames(new ObjectName(domainName + ":type=routes,*"), 
null);
+        assertEquals("Could not find 1 route: " + s, 1, s.size());
     }
 
     @Override
@@ -73,8 +72,7 @@
 
         resultEndpoint.assertIsSatisfied();
 
-        // TODO: see above
-        // verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
+        verifyCounter(mbsc, new ObjectName(domainName + ":type=routes,*"));
     }
 
 }


Reply via email to