This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit ac875ba4e3e81c3f849b77617b98805b336f0a99 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jan 27 12:59:32 2020 +0100 CAMEL-14354: camel-core optimize --- .../engine/DefaultAsyncProcessorAwaitManager.java | 22 ++++++++----- .../camel/processor/DelayProcessorSupport.java | 22 +++++++++---- .../java/org/apache/camel/processor/Enricher.java | 8 +++-- .../apache/camel/processor/FinallyProcessor.java | 4 ++- .../org/apache/camel/processor/LoopProcessor.java | 10 ++++-- .../apache/camel/processor/MulticastProcessor.java | 2 +- .../org/apache/camel/processor/RoutingSlip.java | 24 +++++++++++---- .../java/org/apache/camel/processor/Throttler.java | 36 ++++++++++++++++------ .../org/apache/camel/processor/TryProcessor.java | 10 ++++-- .../processor/aggregate/AggregateProcessor.java | 4 ++- .../loadbalancer/FailOverLoadBalancer.java | 8 +++-- 11 files changed, 109 insertions(+), 41 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index fade8a4..3db01a4 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -96,20 +96,26 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } } while (reactiveExecutor.executeFromQueue()); - LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } try { if (statistics.isStatisticsEnabled()) { blockedCounter.incrementAndGet(); } inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch)); latch.await(); - LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } } catch (InterruptedException e) { - LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", - exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", + exchange.getExchangeId(), exchange); + } exchange.setException(e); } finally { AwaitThread thread = inflight.remove(exchange); @@ -134,7 +140,9 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements } public void countDown(Exchange exchange, CountDownLatch latch) { - LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId()); + } latch.countDown(); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java index 42c6176..e65164a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java @@ -65,7 +65,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple // we are running now so decrement the counter delayedCount.decrementAndGet(); - LOG.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId()); + } if (!isRunAllowed()) { exchange.setException(new RejectedExecutionException("Run is not allowed")); } @@ -74,7 +76,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple DelayProcessorSupport.this.processor.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - LOG.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId()); + } // we must done the callback from this async callback as well, to ensure callback is done correctly // must invoke done on callback with false, as that is what the original caller would // expect as we returned false in the process method @@ -114,8 +118,10 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple delayedCount.incrementAndGet(); ProcessCall call = new ProcessCall(exchange, callback); try { - LOG.trace("Scheduling delayed task to run in {} millis for exchangeId: {}", - delay, exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Scheduling delayed task to run in {} millis for exchangeId: {}", + delay, exchange.getExchangeId()); + } executorService.schedule(call, delay, TimeUnit.MILLISECONDS); // tell Camel routing engine we continue routing asynchronous return false; @@ -126,7 +132,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple if (!isRunAllowed()) { exchange.setException(new RejectedExecutionException()); } else { - LOG.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId()); + } // let caller run by processing try { delay(delay, exchange); @@ -160,7 +168,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor imple delay = calculateDelay(exchange); if (delay <= 0) { // no delay then continue routing - LOG.trace("No delay for exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("No delay for exchangeId: {}", exchange.getExchangeId()); + } return processor.process(exchange, callback); } } catch (Throwable e) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index a63a747..bbbca1b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -254,13 +254,17 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA }); if (!sync) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + } // the remainder of the routing slip will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } - LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); + } if (watch != null) { // emit event that the exchange was sent to the endpoint diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java index 7a65995..447dff3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -115,7 +115,9 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl if (!doneSync) { // signal callback to continue routing async ExchangeHelper.prepareOutToIn(exchange); - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } } } finally { // callback must always be called diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java index a70d550..278a507 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java @@ -129,11 +129,15 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, } else { // we are done so prepare the result ExchangeHelper.copyResults(exchange, current); - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } callback.done(false); } } catch (Exception e) { - LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing failed for exchangeId: {} >>> {}", exchange.getExchangeId(), e.getMessage()); + } exchange.setException(e); callback.done(false); } @@ -141,7 +145,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, @Override public String toString() { - return "LoopState[" + exchange.getExchangeId() + "]"; + return "LoopState"; } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 82bafea..e9bb034 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -302,7 +302,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat @Override public String toString() { - return "MulticastTask[" + original.getExchangeId() + "," + MulticastProcessor.this + "]"; + return "MulticastTask"; } @Override diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java index eec3a47..7069354 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -258,13 +258,17 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA current = prepareExchangeForRoutingSlip(current, endpoint); if (!sync) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + } // the remainder of the routing slip will be completed async // so we break out now, then the callback will be invoked which then continue routing from where we left here return false; } - LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); + } // we ignore some kind of exceptions and allow us to continue if (isIgnoreInvalidEndpoints()) { @@ -287,7 +291,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current); + } // copy results back to the original exchange ExchangeHelper.copyResults(exchange, current); @@ -350,7 +356,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA final AsyncCallback originalCallback, final RoutingSlipIterator iter) { // this does the actual processing so log at trace level - LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } // routing slip callback which are used when // - routing slip was routed asynchronously @@ -424,7 +432,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA current = prepareExchangeForRoutingSlip(current, endpoint1); if (!sync) { - LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); + } return; } } @@ -432,7 +442,9 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots - LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current); + } // copy results back to the original exchange ExchangeHelper.copyResults(original, current); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java index ea5a536..b518a4c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java @@ -135,7 +135,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa } else { // delegate to async pool if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { - LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId()); + } return processAsynchronously(exchange, callback, throttlingState); } @@ -154,10 +156,14 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa if (state == State.ASYNC) { if (LOG.isTraceEnabled()) { long queuedTime = start - queuedStart; - LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId()); + } } } else { - LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId()); + } } } } else { @@ -169,7 +175,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", queuedTime, exchange.getExchangeId()); } } else { - LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId()); + } } } @@ -211,7 +219,9 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa return false; } catch (final RejectedExecutionException e) { if (isCallerRunsWhenRejected()) { - LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId()); + } exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC_REJECTED); return process(exchange, callback); } @@ -314,9 +324,13 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa while (delta > 0) { delayQueue.take(); delta--; - LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId()); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); } - LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); // increase } else if (newThrottle > throttleRate) { @@ -325,9 +339,13 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa delayQueue.put(new ThrottlePermit(-1)); } if (throttleRate == 0) { - LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId()); + } } else { - LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); + } } } throttleRate = newThrottle; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java index c13baaa..a556195 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -101,20 +101,24 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc // process the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); - LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } async.process(exchange, doneSync -> reactiveExecutor.schedule(this)); } else { ExchangeHelper.prepareOutToIn(exchange); exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } callback.done(false); } } @Override public String toString() { - return "TryState[" + exchange.getExchangeId() + "]"; + return "TryState"; } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index a8bde2d..503e2c7 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -893,7 +893,9 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat // grab the timeout value long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; if (timeout > 0) { - LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); + if (LOG.isTraceEnabled()) { + LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); + } addExchangeToTimeoutMap(key, exchange, timeout); } } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java index 286ea75..6a8f900 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java @@ -147,7 +147,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab } } - LOG.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId()); + } return answer; } @@ -201,7 +203,9 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab lastGoodIndex.set(index); // and copy the current result to original so it will contain this result of this eip ExchangeHelper.copyResults(exchange, copy); - LOG.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + if (LOG.isDebugEnabled()) { + LOG.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } callback.done(false); return; }
