Repository: camel
Updated Branches:
  refs/heads/master 14265e437 -> 7d0c5685e


CAMEL-11483: Optimise - Recording time taken for each processor should be 
advice if possible


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2bfd8513
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2bfd8513
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2bfd8513

Branch: refs/heads/master
Commit: 2bfd851382c43fd68673343e382498063fa83655
Parents: 14265e4
Author: Claus Ibsen <[email protected]>
Authored: Thu Jun 29 18:24:24 2017 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Thu Jun 29 18:49:29 2017 +0200

----------------------------------------------------------------------
 .../InstrumentationInterceptStrategy.java       | 31 ++++-------
 .../management/InstrumentationProcessor.java    | 54 +++++++++++++++-----
 .../camel/processor/RedeliveryErrorHandler.java | 18 +++++--
 .../processor/interceptor/DefaultChannel.java   | 32 ++++++++++--
 4 files changed, 91 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
index 4698c2d..8dcc7d1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
@@ -47,34 +47,21 @@ public class InstrumentationInterceptStrategy implements 
InterceptStrategy {
         this.wrappedProcessors = wrappedProcessors;
     }
 
-    public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition,
-                                                 Processor target, Processor 
nextTarget) throws Exception {
-        // do not double wrap it
-        if (target instanceof InstrumentationProcessor) {
-            return target;
-        }
-
-        // only wrap a performance counter if we have it registered in JMX by 
the jmx agent
+    public PerformanceCounter prepareProcessor(ProcessorDefinition<?> 
definition, Processor target, InstrumentationProcessor advice) {
         PerformanceCounter counter = registeredCounters.get(definition);
         if (counter != null) {
-            InstrumentationProcessor wrapper = new 
InstrumentationProcessor(counter);
-            wrapper.setProcessor(target);
-            wrapper.setType(definition.getShortName());
-
-            // add it to the mapping of wrappers so we can later change it to 
a decorated counter
-            // that when we register the processor
-            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> 
holder =
-                    new KeyValueHolder<ProcessorDefinition<?>, 
InstrumentationProcessor>(definition, wrapper);
+            // add it to the mapping of wrappers so we can later change it to a
+            // decorated counter when we register the processor
+            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> 
holder = new KeyValueHolder<>(definition, advice);
             wrappedProcessors.put(target, holder);
-            return wrapper;
         }
-
-        return target;
+        return counter;
     }
 
-    @Override
-    public String toString() {
-        return "InstrumentProcessor";
+    public Processor wrapProcessorInInterceptors(CamelContext context, 
ProcessorDefinition<?> definition,
+                                                 Processor target, Processor 
nextTarget) throws Exception {
+        // no longer in use as we have optimised to avoid wrapping unless 
needed
+        return target;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index 5fc9bb5..92a1ae2 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -18,37 +18,34 @@ package org.apache.camel.management;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.Ordered;
+import org.apache.camel.Processor;
 import org.apache.camel.api.management.PerformanceCounter;
 import org.apache.camel.management.mbean.ManagedPerformanceCounter;
+import org.apache.camel.processor.CamelInternalProcessorAdvice;
 import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * JMX enabled processor that uses the {@link 
org.apache.camel.management.mbean.ManagedCounter} for instrumenting
+ * JMX enabled processor or advice that uses the {@link 
org.apache.camel.management.mbean.ManagedCounter} for instrumenting
  * processing of exchanges.
+ * <p/>
+ * This implementation has been optimised to work in dual mode, either as an 
advice or as a processor.
+ * The former is faster and the latter is required when the error handler has 
been configured with redelivery enabled.
  *
  * @version 
  */
-public class InstrumentationProcessor extends DelegateAsyncProcessor {
-
-    // TODO: Would be good to get this as an advice instead
+public class InstrumentationProcessor extends DelegateAsyncProcessor 
implements CamelInternalProcessorAdvice<StopWatch>, Ordered {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InstrumentationProcessor.class);
     private PerformanceCounter counter;
     private String type;
 
-    public InstrumentationProcessor() {
-    }
-
-    public InstrumentationProcessor(PerformanceCounter counter) {
-        this.counter = counter;
-    }
-
-    @Override
-    public String toString() {
-        return "Instrumentation" + (type != null ? ":" + type : "") + "[" + 
processor + "]";
+    public InstrumentationProcessor(String type, Processor processor) {
+        super(processor);
+        this.type = type;
     }
 
     public void setCounter(Object counter) {
@@ -119,4 +116,33 @@ public class InstrumentationProcessor extends 
DelegateAsyncProcessor {
     public void setType(String type) {
         this.type = type;
     }
+
+    @Override
+    public StopWatch before(Exchange exchange) throws Exception {
+        // only record time if stats is enabled
+        StopWatch answer = counter != null && counter.isStatisticsEnabled() ? 
new StopWatch() : null;
+        if (answer != null) {
+            beginTime(exchange);
+        }
+        return answer;
+    }
+
+    @Override
+    public void after(Exchange exchange, StopWatch watch) throws Exception {
+        // record end time
+        if (watch != null) {
+            recordTime(exchange, watch.taken());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "InstrumentProcessorAdvice";
+    }
+
+    @Override
+    public int getOrder() {
+        // we want instrumentation before calling the processor (but before 
tracer/debugger)
+        return Ordered.LOWEST - 2;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index 7acf559..65fec63 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -70,8 +70,8 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
     protected final Processor deadLetter;
     protected final String deadLetterUri;
     protected final boolean deadLetterHandleNewException;
-    protected final Processor output;
-    protected final AsyncProcessor outputAsync;
+    protected Processor output;
+    protected AsyncProcessor outputAsync;
     protected final Processor redeliveryProcessor;
     protected final RedeliveryPolicy redeliveryPolicy;
     protected final Predicate retryWhilePolicy;
@@ -301,6 +301,18 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
         }
     }
 
+    /**
+     * Allows to change the output of the error handler which are used when 
optimising the
+     * JMX instrumentation to use either an advice or wrapped processor when 
calling a processor.
+     * The former is faster and therefore preferred, however if the error 
handler supports
+     * redelivery we need fine grained instrumentation which then must be 
wrapped and therefore
+     * need to change the output on the error handler.
+     */
+    public void changeOutput(Processor output) {
+        this.output = output;
+        this.outputAsync = AsyncProcessorConverterHelper.convert(output);
+    }
+
     public boolean supportTransacted() {
         return false;
     }
@@ -1403,7 +1415,7 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport impleme
      * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> 
otherwise
      * @throws Exception can be thrown
      */
-    private boolean determineIfRedeliveryIsEnabled() throws Exception {
+    public boolean determineIfRedeliveryIsEnabled() throws Exception {
         // determine if redeliver is enabled either on error handler
         if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
             // must check for != 0 as (-1 means redeliver forever)

http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index 296397b..5e347ac 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -27,6 +27,8 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.management.InstrumentationInterceptStrategy;
+import org.apache.camel.management.InstrumentationProcessor;
 import org.apache.camel.model.ModelChannel;
 import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.model.OnExceptionDefinition;
@@ -36,6 +38,7 @@ import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteDefinitionHelper;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
+import org.apache.camel.processor.RedeliveryErrorHandler;
 import org.apache.camel.processor.WrapProcessor;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
@@ -69,6 +72,7 @@ public class DefaultChannel extends CamelInternalProcessor 
implements ModelChann
     private Processor output;
     private ProcessorDefinition<?> definition;
     private ProcessorDefinition<?> childDefinition;
+    private InstrumentationProcessor instrumentationProcessor;
     private CamelContext camelContext;
     private RouteContext routeContext;
 
@@ -211,11 +215,13 @@ public class DefaultChannel extends 
CamelInternalProcessor implements ModelChann
         // force the creation of an id
         RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), 
definition);
 
-        // first wrap the output with the managed strategy if any
+        // setup instrumentation processor for management (jmx)
+        // this is later used in postInitChannel as we need to setup the error 
handler later as well
         InterceptStrategy managed = routeContext.getManagedInterceptStrategy();
-        if (managed != null) {
-            next = target == nextProcessor ? null : nextProcessor;
-            target = 
managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), 
targetOutputDef, target, next);
+        if (managed != null && managed instanceof 
InstrumentationInterceptStrategy) {
+            InstrumentationInterceptStrategy iis = 
(InstrumentationInterceptStrategy) managed;
+            instrumentationProcessor = new 
InstrumentationProcessor(targetOutputDef.getShortName(), target);
+            iis.prepareProcessor(targetOutputDef, target, 
instrumentationProcessor);
         }
 
         // then wrap the output with the backlog and tracer (backlog first, as 
we do not want regular tracer to tracer the backlog)
@@ -317,7 +323,23 @@ public class DefaultChannel extends CamelInternalProcessor 
implements ModelChann
 
     @Override
     public void postInitChannel(ProcessorDefinition<?> outputDefinition, 
RouteContext routeContext) throws Exception {
-        // noop
+        // if jmx was enabled for the processor then either add as advice or 
wrap and change the processor
+        // on the error handler. See more details in the class javadoc of 
InstrumentationProcessor
+        if (instrumentationProcessor != null) {
+            boolean redeliveryPossible = false;
+            if (errorHandler instanceof RedeliveryErrorHandler) {
+                redeliveryPossible = ((RedeliveryErrorHandler) 
errorHandler).determineIfRedeliveryIsEnabled();
+                if (redeliveryPossible) {
+                    // okay we can redeliver then we need to change the output 
in the error handler
+                    // to use us which we then wrap the call so we can capture 
before/after for redeliveries as well
+                    ((RedeliveryErrorHandler) 
errorHandler).changeOutput(instrumentationProcessor);
+                }
+            }
+            if (!redeliveryPossible) {
+                // optimise to use advice as we cannot redeliver
+                addAdvice(instrumentationProcessor);
+            }
+        }
     }
 
     private InterceptStrategy getOrCreateTracer() {

Reply via email to