This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new e82d926daa4 CAMEL-20267: use a monotonic time source for the Throttler and SamplingThrottler (#12512) e82d926daa4 is described below commit e82d926daa45565cafeb0c3b34c379d1194142f7 Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Wed Dec 20 16:24:44 2023 -0300 CAMEL-20267: use a monotonic time source for the Throttler and SamplingThrottler (#12512) --- .../apache/camel/processor/SamplingThrottler.java | 3 +- .../java/org/apache/camel/processor/Throttler.java | 172 ++++++++++++--------- 2 files changed, 98 insertions(+), 77 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java index e0c9d647c82..3d03b83ed71 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.time.Duration; import java.util.Locale; import java.util.concurrent.TimeUnit; @@ -131,7 +132,7 @@ public class SamplingThrottler extends AsyncProcessorSupport implements Traceabl doSend = true; } } else { - long now = System.currentTimeMillis(); + long now = Duration.ofNanos(System.nanoTime()).toMillis(); if (now >= timeOfLastExchange + periodInMillis) { doSend = true; if (LOG.isTraceEnabled()) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java index a521808b218..edaf2f49ccd 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; @@ -59,7 +60,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey"; - private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; + private static final String PROPERTY_EXCHANGE_QUEUED_TIME = "CamelThrottlerExchangeQueuedTime"; private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; private static final long CLEAN_PERIOD = 1000L * 10; @@ -98,8 +99,8 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa public boolean process(final Exchange exchange, final AsyncCallback callback) { long queuedStart = 0; if (LOG.isTraceEnabled()) { - queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 0L, Long.class); - exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP); + queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIME, 0L, Long.class); + exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIME); } State state = exchange.getProperty(PROPERTY_EXCHANGE_STATE, State.SYNC, State.class); exchange.removeProperty(PROPERTY_EXCHANGE_STATE); @@ -110,87 +111,106 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa throw new RejectedExecutionException("Run is not allowed"); } - String key = DEFAULT_KEY; - if (correlationExpression != null) { - key = correlationExpression.evaluate(exchange, String.class); - } - ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new); - throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange); - - if (!throttlingState.tryAcquire(exchange)) { - if (isRejectExecution()) { - throw new ThrottlerRejectedExecutionException( - "Exceeded the max throttle rate of " + throttlingState.getThrottleRate()); - } else { - // delegate to async pool - if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", - exchange.getExchangeId()); - } - return processAsynchronously(exchange, callback, throttlingState); - } + return doProcess(exchange, callback, state, queuedStart, doneSync); - // block waiting for a permit - long start = 0; - long elapsed = 0; - if (LOG.isTraceEnabled()) { - start = System.currentTimeMillis(); - } - throttlingState.acquire(exchange); - if (LOG.isTraceEnabled()) { - elapsed = System.currentTimeMillis() - start; - } - if (state == State.ASYNC) { - if (LOG.isTraceEnabled()) { - long queuedTime = start - queuedStart; - if (LOG.isTraceEnabled()) { - LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, - exchange.getExchangeId()); - } - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId()); - } + } catch (final InterruptedException e) { + return handleInterrupt(exchange, callback, e, doneSync); + } catch (final Exception t) { + return handleException(exchange, callback, t, doneSync); + } + } + + private boolean doProcess(Exchange exchange, AsyncCallback callback, State state, long queuedStart, boolean doneSync) + throws Exception { + String key = DEFAULT_KEY; + if (correlationExpression != null) { + key = correlationExpression.evaluate(exchange, String.class); + } + ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new); + throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange); + + if (!throttlingState.tryAcquire(exchange)) { + if (isRejectExecution()) { + throw new ThrottlerRejectedExecutionException( + "Exceeded the max throttle rate of " + throttlingState.getThrottleRate()); + } else { + // delegate to async pool + if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", + exchange.getExchangeId()); } + return processAsynchronously(exchange, callback, throttlingState); + } + + doThrottle(exchange, throttlingState, state, queuedStart); + } + } else { + // permit acquired + if (state == State.ASYNC) { + if (LOG.isTraceEnabled()) { + long queuedTime = Duration.ofNanos(System.nanoTime() - queuedStart).toMillis(); + LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", + queuedTime, exchange.getExchangeId()); } } else { - // permit acquired - if (state == State.ASYNC) { - if (LOG.isTraceEnabled()) { - long queuedTime = System.currentTimeMillis() - queuedStart; - LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", - queuedTime, exchange.getExchangeId()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId()); - } + if (LOG.isTraceEnabled()) { + LOG.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId()); } } + } - callback.done(doneSync); - return doneSync; + callback.done(doneSync); + return doneSync; + } - } catch (final InterruptedException e) { - // determine if we can still run, or the camel context is forcing a shutdown - boolean forceShutdown = exchange.getContext().getShutdownStrategy().isForceShutdown(); - if (forceShutdown) { - String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " - + exchange; - LOG.debug(msg); - exchange.setException(new RejectedExecutionException(msg, e)); - } else { - exchange.setException(e); + private static boolean handleException(Exchange exchange, AsyncCallback callback, Exception t, boolean doneSync) { + exchange.setException(t); + callback.done(doneSync); + return doneSync; + } + + private static boolean handleInterrupt( + Exchange exchange, AsyncCallback callback, InterruptedException e, boolean doneSync) { + // determine if we can still run, or the camel context is forcing a shutdown + boolean forceShutdown = exchange.getContext().getShutdownStrategy().isForceShutdown(); + if (forceShutdown) { + String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + + exchange; + LOG.debug(msg); + exchange.setException(new RejectedExecutionException(msg, e)); + } else { + exchange.setException(e); + } + callback.done(doneSync); + return doneSync; + } + + private static void doThrottle(Exchange exchange, ThrottlingState throttlingState, State state, long queuedStart) + throws InterruptedException { + // block waiting for a permit + long start = 0; + long elapsed = 0; + if (LOG.isTraceEnabled()) { + start = System.nanoTime(); + } + throttlingState.acquire(exchange); + if (LOG.isTraceEnabled()) { + elapsed = System.nanoTime() - start; + } + if (state == State.ASYNC) { + if (LOG.isTraceEnabled()) { + long queuedTime = start - queuedStart; + if (LOG.isTraceEnabled()) { + LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, + exchange.getExchangeId()); + } + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId()); } - callback.done(doneSync); - return doneSync; - } catch (final Exception t) { - exchange.setException(t); - callback.done(doneSync); - return doneSync; } } @@ -203,7 +223,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa final Exchange exchange, final AsyncCallback callback, ThrottlingState throttlingState) { try { if (LOG.isTraceEnabled()) { - exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, System.currentTimeMillis()); + exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIME, System.nanoTime()); } exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC); asyncExecutor.submit(() -> process(exchange, callback));