This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e82d926daa4 CAMEL-20267: use a monotonic time source for the Throttler 
and SamplingThrottler (#12512)
e82d926daa4 is described below

commit e82d926daa45565cafeb0c3b34c379d1194142f7
Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com>
AuthorDate: Wed Dec 20 16:24:44 2023 -0300

    CAMEL-20267: use a monotonic time source for the Throttler and 
SamplingThrottler (#12512)
---
 .../apache/camel/processor/SamplingThrottler.java  |   3 +-
 .../java/org/apache/camel/processor/Throttler.java | 172 ++++++++++++---------
 2 files changed, 98 insertions(+), 77 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java
index e0c9d647c82..3d03b83ed71 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SamplingThrottler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.time.Duration;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
@@ -131,7 +132,7 @@ public class SamplingThrottler extends 
AsyncProcessorSupport implements Traceabl
                     doSend = true;
                 }
             } else {
-                long now = System.currentTimeMillis();
+                long now = Duration.ofNanos(System.nanoTime()).toMillis();
                 if (now >= timeOfLastExchange + periodInMillis) {
                     doSend = true;
                     if (LOG.isTraceEnabled()) {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
index a521808b218..edaf2f49ccd 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
@@ -59,7 +60,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
 
     private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
 
-    private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = 
"CamelThrottlerExchangeQueuedTimestamp";
+    private static final String PROPERTY_EXCHANGE_QUEUED_TIME = 
"CamelThrottlerExchangeQueuedTime";
     private static final String PROPERTY_EXCHANGE_STATE = 
"CamelThrottlerExchangeState";
     private static final long CLEAN_PERIOD = 1000L * 10;
 
@@ -98,8 +99,8 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         long queuedStart = 0;
         if (LOG.isTraceEnabled()) {
-            queuedStart = 
exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 0L, Long.class);
-            exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP);
+            queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIME, 
0L, Long.class);
+            exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIME);
         }
         State state = exchange.getProperty(PROPERTY_EXCHANGE_STATE, 
State.SYNC, State.class);
         exchange.removeProperty(PROPERTY_EXCHANGE_STATE);
@@ -110,87 +111,106 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            String key = DEFAULT_KEY;
-            if (correlationExpression != null) {
-                key = correlationExpression.evaluate(exchange, String.class);
-            }
-            ThrottlingState throttlingState = states.computeIfAbsent(key, 
ThrottlingState::new);
-            
throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange);
-
-            if (!throttlingState.tryAcquire(exchange)) {
-                if (isRejectExecution()) {
-                    throw new ThrottlerRejectedExecutionException(
-                            "Exceeded the max throttle rate of " + 
throttlingState.getThrottleRate());
-                } else {
-                    // delegate to async pool
-                    if (isAsyncDelayed() && !exchange.isTransacted() && state 
== State.SYNC) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(
-                                    "Throttle rate exceeded but AsyncDelayed 
enabled, so queueing for async processing, exchangeId: {}",
-                                    exchange.getExchangeId());
-                        }
-                        return processAsynchronously(exchange, callback, 
throttlingState);
-                    }
+            return doProcess(exchange, callback, state, queuedStart, doneSync);
 
-                    // block waiting for a permit
-                    long start = 0;
-                    long elapsed = 0;
-                    if (LOG.isTraceEnabled()) {
-                        start = System.currentTimeMillis();
-                    }
-                    throttlingState.acquire(exchange);
-                    if (LOG.isTraceEnabled()) {
-                        elapsed = System.currentTimeMillis() - start;
-                    }
-                    if (state == State.ASYNC) {
-                        if (LOG.isTraceEnabled()) {
-                            long queuedTime = start - queuedStart;
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Queued for {}ms, Throttled for 
{}ms, exchangeId: {}", queuedTime, elapsed,
-                                        exchange.getExchangeId());
-                            }
-                        }
-                    } else {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Throttled for {}ms, exchangeId: {}", 
elapsed, exchange.getExchangeId());
-                        }
+        } catch (final InterruptedException e) {
+            return handleInterrupt(exchange, callback, e, doneSync);
+        } catch (final Exception t) {
+            return handleException(exchange, callback, t, doneSync);
+        }
+    }
+
+    private boolean doProcess(Exchange exchange, AsyncCallback callback, State 
state, long queuedStart, boolean doneSync)
+            throws Exception {
+        String key = DEFAULT_KEY;
+        if (correlationExpression != null) {
+            key = correlationExpression.evaluate(exchange, String.class);
+        }
+        ThrottlingState throttlingState = states.computeIfAbsent(key, 
ThrottlingState::new);
+        
throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange);
+
+        if (!throttlingState.tryAcquire(exchange)) {
+            if (isRejectExecution()) {
+                throw new ThrottlerRejectedExecutionException(
+                        "Exceeded the max throttle rate of " + 
throttlingState.getThrottleRate());
+            } else {
+                // delegate to async pool
+                if (isAsyncDelayed() && !exchange.isTransacted() && state == 
State.SYNC) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Throttle rate exceeded but AsyncDelayed 
enabled, so queueing for async processing, exchangeId: {}",
+                                exchange.getExchangeId());
                     }
+                    return processAsynchronously(exchange, callback, 
throttlingState);
+                }
+
+                doThrottle(exchange, throttlingState, state, queuedStart);
+            }
+        } else {
+            // permit acquired
+            if (state == State.ASYNC) {
+                if (LOG.isTraceEnabled()) {
+                    long queuedTime = Duration.ofNanos(System.nanoTime() - 
queuedStart).toMillis();
+                    LOG.trace("Queued for {}ms, No throttling applied 
(throttle cleared while queued), for exchangeId: {}",
+                            queuedTime, exchange.getExchangeId());
                 }
             } else {
-                // permit acquired
-                if (state == State.ASYNC) {
-                    if (LOG.isTraceEnabled()) {
-                        long queuedTime = System.currentTimeMillis() - 
queuedStart;
-                        LOG.trace("Queued for {}ms, No throttling applied 
(throttle cleared while queued), for exchangeId: {}",
-                                queuedTime, exchange.getExchangeId());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("No throttling applied to exchangeId: {}", 
exchange.getExchangeId());
-                    }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("No throttling applied to exchangeId: {}", 
exchange.getExchangeId());
                 }
             }
+        }
 
-            callback.done(doneSync);
-            return doneSync;
+        callback.done(doneSync);
+        return doneSync;
+    }
 
-        } catch (final InterruptedException e) {
-            // determine if we can still run, or the camel context is forcing 
a shutdown
-            boolean forceShutdown = 
exchange.getContext().getShutdownStrategy().isForceShutdown();
-            if (forceShutdown) {
-                String msg = "Run not allowed as ShutdownStrategy is forcing 
shutting down, will reject executing exchange: "
-                             + exchange;
-                LOG.debug(msg);
-                exchange.setException(new RejectedExecutionException(msg, e));
-            } else {
-                exchange.setException(e);
+    private static boolean handleException(Exchange exchange, AsyncCallback 
callback, Exception t, boolean doneSync) {
+        exchange.setException(t);
+        callback.done(doneSync);
+        return doneSync;
+    }
+
+    private static boolean handleInterrupt(
+            Exchange exchange, AsyncCallback callback, InterruptedException e, 
boolean doneSync) {
+        // determine if we can still run, or the camel context is forcing a 
shutdown
+        boolean forceShutdown = 
exchange.getContext().getShutdownStrategy().isForceShutdown();
+        if (forceShutdown) {
+            String msg = "Run not allowed as ShutdownStrategy is forcing 
shutting down, will reject executing exchange: "
+                         + exchange;
+            LOG.debug(msg);
+            exchange.setException(new RejectedExecutionException(msg, e));
+        } else {
+            exchange.setException(e);
+        }
+        callback.done(doneSync);
+        return doneSync;
+    }
+
+    private static void doThrottle(Exchange exchange, ThrottlingState 
throttlingState, State state, long queuedStart)
+            throws InterruptedException {
+        // block waiting for a permit
+        long start = 0;
+        long elapsed = 0;
+        if (LOG.isTraceEnabled()) {
+            start = System.nanoTime();
+        }
+        throttlingState.acquire(exchange);
+        if (LOG.isTraceEnabled()) {
+            elapsed = System.nanoTime() - start;
+        }
+        if (state == State.ASYNC) {
+            if (LOG.isTraceEnabled()) {
+                long queuedTime = start - queuedStart;
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Queued for {}ms, Throttled for {}ms, 
exchangeId: {}", queuedTime, elapsed,
+                            exchange.getExchangeId());
+                }
+            }
+        } else {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Throttled for {}ms, exchangeId: {}", elapsed, 
exchange.getExchangeId());
             }
-            callback.done(doneSync);
-            return doneSync;
-        } catch (final Exception t) {
-            exchange.setException(t);
-            callback.done(doneSync);
-            return doneSync;
         }
     }
 
@@ -203,7 +223,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
             final Exchange exchange, final AsyncCallback callback, 
ThrottlingState throttlingState) {
         try {
             if (LOG.isTraceEnabled()) {
-                exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 
System.currentTimeMillis());
+                exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIME, 
System.nanoTime());
             }
             exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC);
             asyncExecutor.submit(() -> process(exchange, callback));

Reply via email to