Repository: camel Updated Branches: refs/heads/master 14265e437 -> 7d0c5685e
CAMEL-11483: Optimise - Recording time taken for each processor should be advice if possible Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2bfd8513 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2bfd8513 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2bfd8513 Branch: refs/heads/master Commit: 2bfd851382c43fd68673343e382498063fa83655 Parents: 14265e4 Author: Claus Ibsen <[email protected]> Authored: Thu Jun 29 18:24:24 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Thu Jun 29 18:49:29 2017 +0200 ---------------------------------------------------------------------- .../InstrumentationInterceptStrategy.java | 31 ++++------- .../management/InstrumentationProcessor.java | 54 +++++++++++++++----- .../camel/processor/RedeliveryErrorHandler.java | 18 +++++-- .../processor/interceptor/DefaultChannel.java | 32 ++++++++++-- 4 files changed, 91 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java index 4698c2d..8dcc7d1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java @@ -47,34 +47,21 @@ public class InstrumentationInterceptStrategy implements InterceptStrategy { this.wrappedProcessors = wrappedProcessors; } - public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, - Processor target, Processor nextTarget) throws Exception { - // do not double wrap it - if (target instanceof InstrumentationProcessor) { - return target; - } - - // only wrap a performance counter if we have it registered in JMX by the jmx agent + public PerformanceCounter prepareProcessor(ProcessorDefinition<?> definition, Processor target, InstrumentationProcessor advice) { PerformanceCounter counter = registeredCounters.get(definition); if (counter != null) { - InstrumentationProcessor wrapper = new InstrumentationProcessor(counter); - wrapper.setProcessor(target); - wrapper.setType(definition.getShortName()); - - // add it to the mapping of wrappers so we can later change it to a decorated counter - // that when we register the processor - KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = - new KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>(definition, wrapper); + // add it to the mapping of wrappers so we can later change it to a + // decorated counter when we register the processor + KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice); wrappedProcessors.put(target, holder); - return wrapper; } - - return target; + return counter; } - @Override - public String toString() { - return "InstrumentProcessor"; + public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?> definition, + Processor target, Processor nextTarget) throws Exception { + // no longer in use as we have optimised to avoid wrapping unless needed + return target; } } http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java index 5fc9bb5..92a1ae2 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java @@ -18,37 +18,34 @@ package org.apache.camel.management; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.Ordered; +import org.apache.camel.Processor; import org.apache.camel.api.management.PerformanceCounter; import org.apache.camel.management.mbean.ManagedPerformanceCounter; +import org.apache.camel.processor.CamelInternalProcessorAdvice; import org.apache.camel.processor.DelegateAsyncProcessor; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * JMX enabled processor that uses the {@link org.apache.camel.management.mbean.ManagedCounter} for instrumenting + * JMX enabled processor or advice that uses the {@link org.apache.camel.management.mbean.ManagedCounter} for instrumenting * processing of exchanges. + * <p/> + * This implementation has been optimised to work in dual mode, either as an advice or as a processor. + * The former is faster and the latter is required when the error handler has been configured with redelivery enabled. * * @version */ -public class InstrumentationProcessor extends DelegateAsyncProcessor { - - // TODO: Would be good to get this as an advice instead +public class InstrumentationProcessor extends DelegateAsyncProcessor implements CamelInternalProcessorAdvice<StopWatch>, Ordered { private static final Logger LOG = LoggerFactory.getLogger(InstrumentationProcessor.class); private PerformanceCounter counter; private String type; - public InstrumentationProcessor() { - } - - public InstrumentationProcessor(PerformanceCounter counter) { - this.counter = counter; - } - - @Override - public String toString() { - return "Instrumentation" + (type != null ? ":" + type : "") + "[" + processor + "]"; + public InstrumentationProcessor(String type, Processor processor) { + super(processor); + this.type = type; } public void setCounter(Object counter) { @@ -119,4 +116,33 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor { public void setType(String type) { this.type = type; } + + @Override + public StopWatch before(Exchange exchange) throws Exception { + // only record time if stats is enabled + StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null; + if (answer != null) { + beginTime(exchange); + } + return answer; + } + + @Override + public void after(Exchange exchange, StopWatch watch) throws Exception { + // record end time + if (watch != null) { + recordTime(exchange, watch.taken()); + } + } + + @Override + public String toString() { + return "InstrumentProcessorAdvice"; + } + + @Override + public int getOrder() { + // we want instrumentation before calling the processor (but before tracer/debugger) + return Ordered.LOWEST - 2; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index 7acf559..65fec63 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -70,8 +70,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme protected final Processor deadLetter; protected final String deadLetterUri; protected final boolean deadLetterHandleNewException; - protected final Processor output; - protected final AsyncProcessor outputAsync; + protected Processor output; + protected AsyncProcessor outputAsync; protected final Processor redeliveryProcessor; protected final RedeliveryPolicy redeliveryPolicy; protected final Predicate retryWhilePolicy; @@ -301,6 +301,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme } } + /** + * Allows to change the output of the error handler which are used when optimising the + * JMX instrumentation to use either an advice or wrapped processor when calling a processor. + * The former is faster and therefore preferred, however if the error handler supports + * redelivery we need fine grained instrumentation which then must be wrapped and therefore + * need to change the output on the error handler. + */ + public void changeOutput(Processor output) { + this.output = output; + this.outputAsync = AsyncProcessorConverterHelper.convert(output); + } + public boolean supportTransacted() { return false; } @@ -1403,7 +1415,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise * @throws Exception can be thrown */ - private boolean determineIfRedeliveryIsEnabled() throws Exception { + public boolean determineIfRedeliveryIsEnabled() throws Exception { // determine if redeliver is enabled either on error handler if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { // must check for != 0 as (-1 means redeliver forever) http://git-wip-us.apache.org/repos/asf/camel/blob/2bfd8513/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index 296397b..5e347ac 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -27,6 +27,8 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Channel; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.management.InstrumentationInterceptStrategy; +import org.apache.camel.management.InstrumentationProcessor; import org.apache.camel.model.ModelChannel; import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.model.OnExceptionDefinition; @@ -36,6 +38,7 @@ import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RouteDefinitionHelper; import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; +import org.apache.camel.processor.RedeliveryErrorHandler; import org.apache.camel.processor.WrapProcessor; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.MessageHistoryFactory; @@ -69,6 +72,7 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann private Processor output; private ProcessorDefinition<?> definition; private ProcessorDefinition<?> childDefinition; + private InstrumentationProcessor instrumentationProcessor; private CamelContext camelContext; private RouteContext routeContext; @@ -211,11 +215,13 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann // force the creation of an id RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); - // first wrap the output with the managed strategy if any + // setup instrumentation processor for management (jmx) + // this is later used in postInitChannel as we need to setup the error handler later as well InterceptStrategy managed = routeContext.getManagedInterceptStrategy(); - if (managed != null) { - next = target == nextProcessor ? null : nextProcessor; - target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); + if (managed != null && managed instanceof InstrumentationInterceptStrategy) { + InstrumentationInterceptStrategy iis = (InstrumentationInterceptStrategy) managed; + instrumentationProcessor = new InstrumentationProcessor(targetOutputDef.getShortName(), target); + iis.prepareProcessor(targetOutputDef, target, instrumentationProcessor); } // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog) @@ -317,7 +323,23 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann @Override public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { - // noop + // if jmx was enabled for the processor then either add as advice or wrap and change the processor + // on the error handler. See more details in the class javadoc of InstrumentationProcessor + if (instrumentationProcessor != null) { + boolean redeliveryPossible = false; + if (errorHandler instanceof RedeliveryErrorHandler) { + redeliveryPossible = ((RedeliveryErrorHandler) errorHandler).determineIfRedeliveryIsEnabled(); + if (redeliveryPossible) { + // okay we can redeliver then we need to change the output in the error handler + // to use us which we then wrap the call so we can capture before/after for redeliveries as well + ((RedeliveryErrorHandler) errorHandler).changeOutput(instrumentationProcessor); + } + } + if (!redeliveryPossible) { + // optimise to use advice as we cannot redeliver + addAdvice(instrumentationProcessor); + } + } } private InterceptStrategy getOrCreateTracer() {
