This is an automated email from the ASF dual-hosted git repository. onders pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit da2b62d0f166965454fbe348439a296af19ac6be Author: Sezgin <onder.sez...@nokia.com> AuthorDate: Wed Jun 13 17:32:59 2018 +0300 Revert "CAMEL-6840 make it possible grouped throttling" This reverts commit a7a458826dbafe1f155f538cfcbc0957d296fad8. --- camel-core/src/main/docs/eips/throttle-eip.adoc | 3 +- .../apache/camel/model/AggregateDefinition.java | 2 +- .../apache/camel/model/ProcessorDefinition.java | 42 -------- .../org/apache/camel/model/ThrottleDefinition.java | 37 +------ .../java/org/apache/camel/processor/Throttler.java | 107 ++------------------- .../camel/processor/ThrottlingGroupingTest.java | 76 --------------- 6 files changed, 12 insertions(+), 255 deletions(-) diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc index 7ae5472..71da959 100644 --- a/camel-core/src/main/docs/eips/throttle-eip.adoc +++ b/camel-core/src/main/docs/eips/throttle-eip.adoc @@ -6,7 +6,7 @@ The Throttler Pattern allows you to ensure that a specific endpoint does not get === Options // eip options: START -The Throttle EIP supports 6 options which are listed below: +The Throttle EIP supports 5 options which are listed below: [width="100%",cols="2,5,^1,2",options="header"] |=== @@ -16,7 +16,6 @@ The Throttle EIP supports 6 options which are listed below: | *asyncDelayed* | Enables asynchronous delay which means the thread will not block while delaying. | false | Boolean | *callerRunsWhenRejected* | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true | true | Boolean | *rejectExecution* | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false | false | Boolean -| *correlationExpression* | The expression used to calculate the correlation key to use for throttle grouping. The Exchange which has the same correlation key is throttled together. | | NamespaceAware Expression |=== // eip options: END diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index 1aa34c4c..2e60ec3 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -142,7 +142,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition this(ExpressionNodeHelper.toExpressionDefinition(expression)); } - private AggregateDefinition(ExpressionDefinition correlationExpression) { + public AggregateDefinition(ExpressionDefinition correlationExpression) { setExpression(correlationExpression); ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index e4622e7..005270e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -2284,48 +2284,6 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> addOutput(answer); return answer; } - - /** - * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> - * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, - * or that we don't exceed an agreed SLA with some external service. - * Here another parameter correlationExpressionKey is introduced for the functionality which - * will throttle based on the key expression to group exchanges. This will make key-based throttling - * instead of overall throttling. - * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 - * will default ensure at most 10 messages per second. - * - * @param maximumRequestCount an expression to calculate the maximum request count - * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall throttling - * @return the builder - */ - public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) { - ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount); - addOutput(answer); - return answer; - } - - /** - * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> - * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, - * or that we don't exceed an agreed SLA with some external service. - * Here another parameter correlationExpressionKey is introduced for the functionality which - * will throttle based on the key expression to group exchanges. This will make key-based throttling - * instead of overall throttling. - * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 - * will default ensure at most 10 messages per second. - * - * @param maximumRequestCount an expression to calculate the maximum request count - * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key instead of overall throttling - * @return the builder - */ - public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) { - ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount); - addOutput(answer); - return answer; - } /** * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java index 7bd5213..613d2b3 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; -import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; @@ -56,9 +55,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic private Boolean callerRunsWhenRejected; @XmlAttribute private Boolean rejectExecution; - @XmlElement(name = "correlationExpression") - private ExpressionSubElementDefinition correlationExpression; - + public ThrottleDefinition() { } @@ -66,18 +63,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic super(maximumRequestsPerPeriod); } - public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) { - this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression); - } - - private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) { - super(maximumRequestsPerPeriod); - - ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); - cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); - setCorrelationExpression(cor); - } - @Override public String toString() { return "Throttle[" + description() + " -> " + getOutputs() + "]"; @@ -108,14 +93,9 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic if (maxRequestsExpression == null) { throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); } - - Expression correlation = null; - if (correlationExpression != null) { - correlation = correlationExpression.createExpression(routeContext); - } boolean reject = getRejectExecution() != null && getRejectExecution(); - Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation); + Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject); answer.setAsyncDelayed(async); if (getCallerRunsWhenRejected() == null) { @@ -124,7 +104,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic } else { answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); } - return answer; } @@ -277,16 +256,4 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic public void setRejectExecution(Boolean rejectExecution) { this.rejectExecution = rejectExecution; } - - /** - * The expression used to calculate the correlation key to use for throttle grouping. - * The Exchange which has the same correlation key is throttled together. - */ - public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { - this.correlationExpression = correlationExpression; - } - - public ExpressionSubElementDefinition getCorrelationExpression() { - return correlationExpression; - } } diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 73d53f0..543ec9a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -16,11 +16,8 @@ */ package org.apache.camel.processor; -import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -34,11 +31,7 @@ import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.CamelContextHelper; -import org.apache.camel.util.LRUCache; -import org.apache.camel.util.LRUCacheFactory; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,14 +61,12 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; - // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation - // when there is no grouping for throttling - private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1); private enum State { SYNC, ASYNC, ASYNC_REJECTED } private final Logger log = LoggerFactory.getLogger(Throttler.class); private final CamelContext camelContext; + private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>(); private final ExecutorService asyncExecutor; private final boolean shutdownAsyncExecutor; @@ -86,14 +77,9 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw private boolean rejectExecution; private boolean asyncDelayed; private boolean callerRunsWhenRejected = true; - private Expression correlationExpression; - // below 2 fields added for (throttling grouping) - private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache; - private ExecutorService delayQueueCacheExecutorService; - public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis, - final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) { + final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) { super(processor); this.camelContext = camelContext; this.rejectExecution = rejectExecution; @@ -107,7 +93,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } this.timePeriodMillis = timePeriodMillis; this.asyncExecutor = asyncExecutor; - this.correlationExpression = correlation; } @Override @@ -126,8 +111,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw throw new RejectedExecutionException("Run is not allowed"); } - calculateAndSetMaxRequestsPerPeriod(exchange, doneSync); - DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync); + calculateAndSetMaxRequestsPerPeriod(exchange); ThrottlePermit permit = delayQueue.poll(); if (permit == null) { @@ -151,7 +135,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw if (log.isTraceEnabled()) { elapsed = System.currentTimeMillis() - start; } - enqueuePermit(permit, exchange, doneSync); + enqueuePermit(permit, exchange); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -163,7 +147,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } } else { - enqueuePermit(permit, exchange, doneSync); + enqueuePermit(permit, exchange); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -208,34 +192,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } - private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException { - Integer key; - CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>(); - - if (correlationExpression != null) { - key = correlationExpression.evaluate(exchange, Integer.class); - } else { - key = NO_CORRELATION_QUEUE_ID; - } - - if (!doneSync) { - delayQueueCacheExecutorService.submit(() -> { - futureDelayQueue.complete(findDelayQueue(key)); - }); - } - - return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key); - } - - private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) { - DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key); - if (currentDelayQueue == null) { - currentDelayQueue = new DelayQueue<>(); - delayQueueCache.put(key, currentDelayQueue); - } - return currentDelayQueue; - } - /** * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not @@ -266,12 +222,10 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw /** * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now. - * @throws ExecutionException - * @throws InterruptedException */ - protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, final boolean doneSync) throws InterruptedException, ExecutionException { + protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) { permit.setDelayMs(getTimePeriodMillis()); - locateDelayQueue(exchange, doneSync).put(permit); + delayQueue.put(permit); // try and incur the least amount of overhead while releasing permits back to the queue if (log.isTraceEnabled()) { log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId()); @@ -281,7 +235,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw /** * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down. */ - protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange, final boolean doneSync) throws Exception { + protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception { Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class); if (newThrottle != null && newThrottle < 0) { @@ -295,8 +249,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw if (newThrottle != null) { if (newThrottle != throttleRate) { - // get the queue from the cache - DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, doneSync); // decrease if (throttleRate > newThrottle) { int delta = throttleRate - newThrottle; @@ -327,62 +279,19 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw } } - @SuppressWarnings("unchecked") @Override protected void doStart() throws Exception { if (isAsyncDelayed()) { ObjectHelper.notNull(asyncExecutor, "executorService", this); } - if (camelContext != null) { - int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext); - if (maxSize > 0) { - delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false); - log.debug("DelayQueues cache size: {}", maxSize); - } else { - delayQueueCache = LRUCacheFactory.newLRUCache(100); - log.debug("Defaulting DelayQueues cache size: {}", 100); - } - } - if (delayQueueCache != null) { - ServiceHelper.startService(delayQueueCache); - } - if (delayQueueCacheExecutorService == null) { - String name = getClass().getSimpleName() + "-DelayQueueLocatorTask"; - delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name); - } super.doStart(); } - - /** - * Strategy to create the thread pool for locating right DelayQueue from the case as a background task - * - * @param name the suggested name for the background thread - * @return the thread pool - */ - protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) { - // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in - return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); - } - @SuppressWarnings("rawtypes") @Override protected void doShutdown() throws Exception { if (shutdownAsyncExecutor && asyncExecutor != null) { camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor); } - if (delayQueueCacheExecutorService != null) { - camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService); - } - if (delayQueueCache != null) { - ServiceHelper.stopService(delayQueueCache); - if (log.isDebugEnabled()) { - if (delayQueueCache instanceof LRUCache) { - log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]", - delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted()); - } - } - delayQueueCache.clear(); - } super.doShutdown(); } diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java deleted file mode 100644 index 01cd378..0000000 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.processor; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.camel.ContextTestSupport; -import org.apache.camel.builder.RouteBuilder; - -/** - * @version - */ -public class ThrottlingGroupingTest extends ContextTestSupport { - - public void testGroupingWithSingleConstant() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); - getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); - - template.sendBodyAndHeader("seda:a", "Kaboom", "max", null); - template.sendBodyAndHeader("seda:a", "Hello World", "max", 2); - template.sendBodyAndHeader("seda:a", "Bye World", "max", 2); - - assertMockEndpointsSatisfied(); - } - - public void testGroupingWithDynamicHeaderExpression() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); - getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); - getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World"); - - Map<String, Object> headers = new HashMap<String, Object>(); - headers.put("max", null); - - template.sendBodyAndHeaders("seda:a", "Kaboom", headers); - - headers.put("max", "2"); - template.sendBodyAndHeaders("seda:a", "Hello World", headers); - template.sendBodyAndHeaders("seda:b", "Bye World", headers); - - headers.put("key", "1"); - template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers); - headers.put("key", "2"); - template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers); - - assertMockEndpointsSatisfied(); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - errorHandler(deadLetterChannel("mock:dead")); - - from("seda:a").throttle(1, header("max")).to("mock:result"); - from("seda:b").throttle(2, header("max")).to("mock:result"); - from("seda:c").throttle(header("key"), header("max")).to("mock:resultdynamic"); - } - }; - } -} -- To stop receiving notification emails like this one, please contact ond...@apache.org.