This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 4aed86d2965dfa5d76b90ce9d5774a13e1d6ee8f Author: GuoYL <[email protected]> AuthorDate: Thu Jul 16 11:34:43 2020 +0800 [SCB-2043] flow control support leak bucket and token bucket --- .../java/org/apache/servicecomb/qps/Config.java | 9 ++ .../qps/ConsumerQpsFlowControlHandler.java | 7 +- .../qps/ProviderQpsFlowControlHandler.java | 15 +- .../servicecomb/qps/QpsControllerManager.java | 144 +++++++++++------- .../org/apache/servicecomb/qps/QpsStrategy.java | 23 +++ .../qps/strategy/AbstractQpsStrategy.java | 66 +++++++++ .../FixedWindowStrategy.java} | 34 ++--- .../qps/strategy/LeakyBucketStrategy.java | 79 ++++++++++ .../qps/strategy/SlidingWindowStrategy.java | 34 +++++ .../servicecomb/qps/strategy/StrategyType.java | 44 ++++++ .../qps/strategy/TokenBucketStrategy.java | 28 ++++ .../servicecomb/qps/QpsControllerManagerTest.java | 162 +++++++++++---------- .../qps/TestConsumerQpsFlowControlHandler.java | 36 ++--- .../qps/TestProviderQpsFlowControlHandler.java | 20 +-- .../apache/servicecomb/qps/TestQpsStrategy.java | 52 +++++++ .../router/custom/RouterInvokeFilter.java | 2 +- 16 files changed, 564 insertions(+), 191 deletions(-) diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java index 2c6d69a..ea8e4a1 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java @@ -26,6 +26,15 @@ import com.netflix.config.DynamicPropertyFactory; public final class Config { private static final Logger LOGGER = LoggerFactory.getLogger(Config.class); + public static final String STRATEGY_KEY_PREFIX = "servicecomb.flowcontrol.strategy"; + + public static final String CONSUMER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.bucket."; + + public static final String PROVIDER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.bucket."; + + public static final String PROVIDER_BUCKET_KEY_GLOBAL = + "servicecomb.flowcontrol.Provider.qps.global.bucket"; + public static final String CONSUMER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.limit."; public static final String PROVIDER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.limit."; diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java index bc82c6c..0972433 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java @@ -29,7 +29,8 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException; */ public class ConsumerQpsFlowControlHandler implements Handler { static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX) + .setBucketKeyPrefix(Config.CONSUMER_BUCKET_KEY_PREFIX); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { @@ -38,8 +39,8 @@ public class ConsumerQpsFlowControlHandler implements Handler { return; } - QpsController qpsController = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation); - if (qpsController.isLimitNewRequest()) { + QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation); + if (qpsStrategy.isLimitNewRequest()) { // return http status 429 CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol"); asyncResp.consumerFail( diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java index 67ade94..c5e4445 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java @@ -27,8 +27,9 @@ import org.springframework.util.StringUtils; public class ProviderQpsFlowControlHandler implements Handler { static final QpsControllerManager qpsControllerMgr = new QpsControllerManager() - .setConfigKeyPrefix(Config.PROVIDER_LIMIT_KEY_PREFIX) - .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL); + .setLimitKeyPrefix(Config.PROVIDER_LIMIT_KEY_PREFIX) + .setBucketKeyPrefix(Config.PROVIDER_BUCKET_KEY_PREFIX) + .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL); @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { @@ -46,15 +47,15 @@ public class ProviderQpsFlowControlHandler implements Handler { } String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE); - QpsController qpsController = + QpsStrategy qpsStrategy = StringUtils.isEmpty(microserviceName) - ? qpsControllerMgr.getGlobalQpsController() + ? qpsControllerMgr.getGlobalQpsStrategy() : qpsControllerMgr.getOrCreate(microserviceName, invocation); - isLimitNewRequest(qpsController, asyncResp); + isLimitNewRequest(qpsStrategy, asyncResp); } - private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) { - if (qpsController.isLimitNewRequest()) { + private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) { + if (qpsStrategy.isLimitNewRequest()) { CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol"); asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData)); return true; diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java index d637e1f..4a989e7 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java @@ -22,6 +22,11 @@ import java.util.Map.Entry; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; +import org.apache.servicecomb.qps.strategy.LeakyBucketStrategy; +import org.apache.servicecomb.qps.strategy.StrategyType; +import org.apache.servicecomb.qps.strategy.TokenBucketStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,31 +38,34 @@ public class QpsControllerManager { /** * Describe the relationship between configuration and qpsController. */ - protected final Map<String, QpsController> configQpsControllerMap = new ConcurrentHashMapEx<>(); + protected final Map<String, AbstractQpsStrategy> configQpsControllerMap = new ConcurrentHashMapEx<>(); /** * Describe the relationship between qualifiedKey(format is "microservice.schema.operation") and qpsController. */ - protected final Map<String, QpsController> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); + protected final Map<String, AbstractQpsStrategy> qualifiedNameControllerMap = new ConcurrentHashMapEx<>(); - protected QpsController globalQpsController; + protected AbstractQpsStrategy globalQpsStrategy; public static final String SEPARATOR = "."; - private String configKeyPrefix; + private String limitKeyPrefix; - public QpsController getOrCreate(String microserviceName, Invocation invocation) { + private String bucketKeyPrefix; + + public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) { return qualifiedNameControllerMap - .computeIfAbsent(microserviceName + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), key -> { - return create(key, microserviceName, invocation); - }); + .computeIfAbsent( + microserviceName + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), + key -> create(key, microserviceName, invocation)); } /** * Create relevant qpsLimit dynamicProperty and watch the configuration change. * Search and return a valid qpsController. */ - protected QpsController create(String qualifiedNameKey, String microserviceName, Invocation invocation) { + protected AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName, + Invocation invocation) { // create "microservice" createQpsControllerIfNotExist(microserviceName); // create "microservice.schema" @@ -70,7 +78,7 @@ public class QpsControllerManager { } /** - * <p> Use qualifiedNameKey to search {@link QpsController}. + * <p> Use qualifiedNameKey to search {@link QpsStrategy}. * Firstly try to search "microservice.schema.operation". If no valid result found, then try "microservice.schema", * and then "microservice" or global qpsController(If there is a global qpsController).</p> * <p> This method ensures that there is always an existing qpsController returned, as the relevant qpsController has @@ -79,42 +87,42 @@ public class QpsControllerManager { * @param qualifiedNameKey qualifiedNameKey in {@link #qualifiedNameControllerMap} * @return a qps controller, lower level controllers with valid qpsLimit have priority. */ - protected QpsController searchQpsController(String qualifiedNameKey) { - QpsController qpsController = configQpsControllerMap.get(qualifiedNameKey); - if (isValidQpsController(qpsController)) { - return qpsController; + protected AbstractQpsStrategy searchQpsController(String qualifiedNameKey) { + AbstractQpsStrategy qpsStrategy = configQpsControllerMap.get(qualifiedNameKey); + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } int index = qualifiedNameKey.lastIndexOf(SEPARATOR); while (index > 0) { - qpsController = configQpsControllerMap.get(qualifiedNameKey.substring(0, index)); - if (isValidQpsController(qpsController)) { - return qpsController; + qpsStrategy = configQpsControllerMap.get(qualifiedNameKey.substring(0, index)); + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } index = qualifiedNameKey.lastIndexOf(SEPARATOR, index - 1); } - if (isValidQpsController(qpsController)) { - return qpsController; + if (isValidQpsController(qpsStrategy)) { + return qpsStrategy; } - if (null != globalQpsController) { - return globalQpsController; + if (null != globalQpsStrategy) { + return globalQpsStrategy; } // if null is returned, maybe the operation qps controller is not initiated correctly. // getOrCreateQpsController() should be invoked before. - return qpsController; + return qpsStrategy; } - private boolean keyMatch(String configKey, Entry<String, QpsController> controllerEntry) { + private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) { return controllerEntry.getKey().equals(configKey) || controllerEntry.getKey().startsWith(configKey + SEPARATOR); } - private boolean isValidQpsController(QpsController qpsController) { - return null != qpsController && null != qpsController.getQpsLimit(); + private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) { + return null != qpsStrategy && null != qpsStrategy.getQpsLimit(); } private void createQpsControllerIfNotExist(String configKey) { @@ -123,52 +131,86 @@ public class QpsControllerManager { } LOGGER.info("Create qpsController, configKey = [{}]", configKey); - DynamicProperty property = getDynamicProperty(configKey); - QpsController qpsController = new QpsController(configKey, property.getInteger()); - - configQpsControllerMap.put(configKey, qpsController); - - property.addCallback(() -> { - qpsController.setQpsLimit(property.getInteger()); - LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, property.getString()); + DynamicProperty limitProperty = DynamicProperty.getInstance(limitKeyPrefix + configKey); + DynamicProperty bucketProperty = DynamicProperty.getInstance(bucketKeyPrefix + configKey); + AbstractQpsStrategy qpsStrategy = chooseStrategy(configKey, limitProperty.getLong(), + bucketProperty.getLong()); + + limitProperty.addCallback(() -> { + qpsStrategy.setQpsLimit(limitProperty.getLong()); + LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, + limitProperty.getString()); updateObjMap(configKey); }); + bucketProperty.addCallback(() -> { + qpsStrategy.setBucketLimit(bucketProperty.getLong()); + LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey, + bucketProperty.getString()); + updateObjMap(configKey); + }); + + configQpsControllerMap.put(configKey, qpsStrategy); } protected void updateObjMap(String configKey) { - for (Entry<String, QpsController> controllerEntry : qualifiedNameControllerMap.entrySet()) { + for (Entry<String, AbstractQpsStrategy> controllerEntry : qualifiedNameControllerMap + .entrySet()) { if (keyMatch(configKey, controllerEntry)) { - QpsController qpsController = searchQpsController(controllerEntry.getKey()); - controllerEntry.setValue(qpsController); + AbstractQpsStrategy qpsStrategy = searchQpsController(controllerEntry.getKey()); + controllerEntry.setValue(qpsStrategy); LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]", - controllerEntry.getKey(), qpsController.getKey(), qpsController.getQpsLimit()); + controllerEntry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit()); } } } - public QpsControllerManager setConfigKeyPrefix(String configKeyPrefix) { - this.configKeyPrefix = configKeyPrefix; + public QpsControllerManager setLimitKeyPrefix(String limitKeyPrefix) { + this.limitKeyPrefix = limitKeyPrefix; return this; } - public QpsControllerManager setGlobalQpsController(String globalConfigKey) { - DynamicProperty globalQpsProperty = DynamicProperty.getInstance(globalConfigKey); - QpsController qpsController = new QpsController(globalConfigKey, globalQpsProperty.getInteger()); + public QpsControllerManager setBucketKeyPrefix(String bucketKeyPrefix) { + this.bucketKeyPrefix = bucketKeyPrefix; + return this; + } - globalQpsProperty.addCallback(() -> { - qpsController.setQpsLimit(globalQpsProperty.getInteger()); - LOGGER.info("Global qps limit update, value = [{}]", globalQpsProperty.getInteger()); + public QpsControllerManager setGlobalQpsStrategy(String globalLimitKey, String globalBucketKey) { + DynamicProperty globalLimitProperty = DynamicProperty.getInstance(globalLimitKey); + DynamicProperty globalBucketProperty = DynamicProperty.getInstance(globalBucketKey); + globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong(), + globalBucketProperty.getLong()); + globalLimitProperty.addCallback(() -> { + globalQpsStrategy.setQpsLimit(globalLimitProperty.getLong()); + LOGGER.info("Global qps limit update, value = [{}]", globalLimitProperty.getInteger()); + }); + globalBucketProperty.addCallback(() -> { + globalQpsStrategy.setBucketLimit(globalBucketProperty.getLong()); + LOGGER.info("Global bucket limit update, value = [{}]", globalBucketProperty.getInteger()); }); - - this.globalQpsController = qpsController; return this; } - public QpsController getGlobalQpsController() { - return globalQpsController; + private AbstractQpsStrategy chooseStrategy(String globalConfigKey, Long limit, Long bucket) { + String strategyKey = DynamicProperty.getInstance(Config.STRATEGY_KEY_PREFIX).getString(); + AbstractQpsStrategy strategy; + switch (StrategyType.parseStrategyType(strategyKey)) { + case FixedWindow: + strategy = new FixedWindowStrategy(globalConfigKey, limit); + break; + case LeakyBucket: + strategy = new LeakyBucketStrategy(globalConfigKey, limit); + break; + case TokenBucket: + strategy = new TokenBucketStrategy(globalConfigKey, limit, bucket); + break; + case SlidingWindow: + default: + strategy = new FixedWindowStrategy(globalConfigKey, limit); + } + return strategy; } - protected DynamicProperty getDynamicProperty(String configKey) { - return DynamicProperty.getInstance(configKeyPrefix + configKey); + public QpsStrategy getGlobalQpsStrategy() { + return globalQpsStrategy; } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java new file mode 100644 index 0000000..606d7b8 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps; + +public interface QpsStrategy { + + boolean isLimitNewRequest(); +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java new file mode 100644 index 0000000..9285142 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +import org.apache.servicecomb.qps.QpsStrategy; + +/** + * @Date 2020/7/14 + **/ +public class AbstractQpsStrategy implements QpsStrategy { + + private Long qpsLimit; + + private Long bucketLimit; + + private String key; + + public AbstractQpsStrategy(Long qpsLimit, String key) { + this.qpsLimit = qpsLimit; + this.key = key; + } + + public Long getBucketLimit() { + return bucketLimit; + } + + public void setBucketLimit(Long bucketLimit) { + this.bucketLimit = bucketLimit; + } + + @Override + public boolean isLimitNewRequest() { + return true; + } + + public void setQpsLimit(Long qpsLimit) { + this.qpsLimit = qpsLimit; + } + + public Long getQpsLimit() { + return qpsLimit; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java similarity index 77% rename from handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java rename to handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java index 5f294ba..01b6e31 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.servicecomb.qps; +package org.apache.servicecomb.qps.strategy; import java.util.concurrent.atomic.AtomicLong; -public class QpsController { - private String key; - - private Integer qpsLimit; +/** + * @Date 2020/7/8 + **/ +public class FixedWindowStrategy extends AbstractQpsStrategy { // Interval begin time private volatile long msCycleBegin; @@ -35,26 +34,16 @@ public class QpsController { private static final int CYCLE_LENGTH = 1000; - public QpsController(String key, Integer qpsLimit) { - this.key = key; - this.qpsLimit = qpsLimit; + public FixedWindowStrategy(String key, Long qpsLimit) { + super(qpsLimit, key); this.msCycleBegin = System.currentTimeMillis(); } - public String getKey() { - return key; - } - - public Integer getQpsLimit() { - return qpsLimit; - } - - public void setQpsLimit(Integer qpsLimit) { - this.qpsLimit = qpsLimit; - } - // return true means new request need to be rejected public boolean isLimitNewRequest() { + if (this.getQpsLimit() == null) { + this.setQpsLimit(Long.MAX_VALUE); + } long newCount = requestCount.incrementAndGet(); long msNow = System.currentTimeMillis(); //Time jump cause the new request injected @@ -66,7 +55,6 @@ public class QpsController { // Configuration update and use is at the situation of multi-threaded concurrency // It is possible that operation level updated to null,but schema level or microservice level does not updated - int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit; - return newCount - lastRequestCount >= limitValue; + return newCount - lastRequestCount >= this.getQpsLimit(); } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java new file mode 100644 index 0000000..9be7f2e --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * leaky bucket algorithm include 2 implementation : + * 1. as a meter : it's same as the token bucket. + * 2. as a queue : the bucket size equal to qpsLimit. + * + * @Date 2020/7/7 + **/ +public class LeakyBucketStrategy extends AbstractQpsStrategy { + + // Request count between Interval begin and now in one interval + private volatile AtomicLong requestCount = new AtomicLong(); + + private volatile long lastTime; + + private long remainder = 0; + + public LeakyBucketStrategy(String key, Long qpsLimit) { + super(qpsLimit, key); + this.setBucketLimit(qpsLimit); + } + + public LeakyBucketStrategy(String key, Long qpsLimit, Long bucketLimit) { + super(qpsLimit, key); + this.setBucketLimit(bucketLimit); + } + + /** + * @return + */ + @Override + public boolean isLimitNewRequest() { + if (this.getQpsLimit() == null) { + this.setQpsLimit(Long.MAX_VALUE); + } + if (this.getBucketLimit() == null) { + this.setBucketLimit( + this.getQpsLimit() <= Long.MAX_VALUE / 2 ? this.getQpsLimit() * 2 : this.getQpsLimit()); + } + long nowTime = System.currentTimeMillis(); + //get the num of te period time + long leakCount = ((nowTime - lastTime + remainder) / 1000) * this.getQpsLimit(); + remainder = (nowTime - lastTime + remainder) % 1000; + // leak the request , if leak + if (requestCount.longValue() > leakCount) { + requestCount.addAndGet(-leakCount); + } else { + requestCount.set(0); + } + lastTime = nowTime; + //compute this time + if (requestCount.longValue() < this.getBucketLimit()) { + requestCount.incrementAndGet(); + return false; + } + return true; + } + +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/SlidingWindowStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/SlidingWindowStrategy.java new file mode 100644 index 0000000..4671ba2 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/SlidingWindowStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + + +/** + * @Date 2020/7/8 + **/ +public class SlidingWindowStrategy extends AbstractQpsStrategy { + + public SlidingWindowStrategy(long qpsLimit, String key) { + super(qpsLimit, key); + } + + @Override + public boolean isLimitNewRequest() { + return true; + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/StrategyType.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/StrategyType.java new file mode 100644 index 0000000..8aa241b --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/StrategyType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +import org.apache.commons.lang3.StringUtils; + +/** + * @Author GuoYl123 + * @Date 2020/7/13 + **/ +public enum StrategyType { + TokenBucket, + LeakyBucket, + FixedWindow, + SlidingWindow; + + + public static StrategyType parseStrategyType(String type) { + if (StringUtils.isEmpty(type)) { + return StrategyType.FixedWindow; + } + + try { + return StrategyType.valueOf(type.toUpperCase()); + } catch (IllegalArgumentException e) { + return StrategyType.FixedWindow; + } + } +} diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java new file mode 100644 index 0000000..582cbe6 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps.strategy; + +/** + * @Date 2020/7/16 + **/ +public class TokenBucketStrategy extends LeakyBucketStrategy { + + public TokenBucketStrategy(String key, Long qpsLimit, Long bucketLimit) { + super(key, qpsLimit, bucketLimit); + } +} diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java index 44b86b2..6089eac 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java @@ -23,6 +23,7 @@ import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.core.definition.SchemaMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,30 +59,30 @@ public class QpsControllerManagerTest { } }; QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); initTestQpsControllerManager(testQpsControllerManager, invocation, operationMeta); // pojo setConfigWithDefaultPrefix("pojo", 100); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals("pojo2", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server", 10000); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj.server", qpsController.getKey()); - Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 10000); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 10000L); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server.test", 20000); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj.server.test", qpsController.getKey()); - Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 20000); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 20000L); testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); } @@ -100,32 +101,32 @@ public class QpsControllerManagerTest { }; QpsControllerManager testQpsControllerManager = new QpsControllerManager() - .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL) - .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); + .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL) + .setLimitKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX); // global setConfig(Config.PROVIDER_LIMIT_KEY_GLOBAL, 50); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo setConfigWithDefaultPrefix("pojo", 100); - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey()); - Assert.assertTrue(50 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta); } @@ -143,9 +144,9 @@ public class QpsControllerManagerTest { } }; QpsControllerManager qpsControllerManager = new QpsControllerManager(); - QpsController qpsController = qpsControllerManager.getOrCreate("service", invocation); - Assert.assertEquals("service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = qpsControllerManager.getOrCreate("service", invocation); + Assert.assertEquals("service", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { @@ -157,9 +158,9 @@ public class QpsControllerManagerTest { result = "test_schema.test_opr"; } }; - qpsController = qpsControllerManager.getOrCreate("test_service", invocation); - Assert.assertEquals("test_service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("test_service", invocation); + Assert.assertEquals("test_service", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { @@ -171,9 +172,9 @@ public class QpsControllerManagerTest { result = "test-schema.test-opr"; } }; - qpsController = qpsControllerManager.getOrCreate("test-service", invocation); - Assert.assertEquals("test-service", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("test-service", invocation); + Assert.assertEquals("test-service", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { @@ -185,9 +186,9 @@ public class QpsControllerManagerTest { result = "schema.opr.tail"; } }; - qpsController = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); + Assert.assertEquals("svc", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { @@ -199,9 +200,9 @@ public class QpsControllerManagerTest { result = "schema.opr2.tail"; } }; - qpsController = qpsControllerManager.getOrCreate("svc", invocation); - Assert.assertEquals("svc", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation); + Assert.assertEquals("svc", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); } private void testGetOrCreateCommon(QpsControllerManager testQpsControllerManager, Invocation invocation, @@ -215,9 +216,9 @@ public class QpsControllerManagerTest { } }; setConfigWithDefaultPrefix("pojo.server", 200); - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -226,9 +227,9 @@ public class QpsControllerManagerTest { result = "server2.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -237,9 +238,9 @@ public class QpsControllerManagerTest { result = "serve.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertTrue(100 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo.server.test new Expectations() { @@ -251,9 +252,9 @@ public class QpsControllerManagerTest { } }; setConfigWithDefaultPrefix("pojo.server.test", 300); - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server.test", qpsController.getKey()); - Assert.assertTrue(300 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(300 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -262,9 +263,9 @@ public class QpsControllerManagerTest { result = "server.test2"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); new Expectations() { { invocation.getOperationMeta(); @@ -274,9 +275,9 @@ public class QpsControllerManagerTest { result = "server.tes"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo.server", qpsController.getKey()); - Assert.assertTrue(200 == qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); } /** @@ -295,9 +296,9 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation); - Assert.assertEquals("pojo", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation); + Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // pojo.server.test2 new Expectations() { @@ -362,9 +363,9 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation); - Assert.assertEquals("pojo2", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation); + Assert.assertEquals("pojo2", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); // poj.server.test new Expectations() { @@ -377,9 +378,9 @@ public class QpsControllerManagerTest { result = "server.test"; } }; - qpsController = testQpsControllerManager.getOrCreate("poj", invocation); - Assert.assertEquals("poj", qpsController.getKey()); - Assert.assertNull(qpsController.getQpsLimit()); + qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation); + Assert.assertEquals("poj", ((AbstractQpsStrategy) qpsStrategy).getKey()); + Assert.assertNull(((AbstractQpsStrategy) qpsStrategy).getQpsLimit()); } @Test @@ -431,9 +432,10 @@ public class QpsControllerManagerTest { } public static void clearState(QpsControllerManager qpsControllerManager) { - Map<String, QpsController> objMap = Deencapsulation.getField(qpsControllerManager, "qualifiedNameControllerMap"); + Map<String, QpsStrategy> objMap = Deencapsulation + .getField(qpsControllerManager, "qualifiedNameControllerMap"); objMap.clear(); - Map<String, QpsController> configQpsControllerMap = Deencapsulation + Map<String, QpsStrategy> configQpsControllerMap = Deencapsulation .getField(qpsControllerManager, "configQpsControllerMap"); configQpsControllerMap.clear(); } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java index b472a85..619ef76 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; @@ -68,23 +70,23 @@ public class TestConsumerQpsFlowControlHandler { @Test public void testQpsController() { - QpsController qpsController = new QpsController("abc", 100); - Assert.assertEquals(false, qpsController.isLimitNewRequest()); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy("abc", 100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); - qpsController.setQpsLimit(1); - Assert.assertEquals(true, qpsController.isLimitNewRequest()); + qpsStrategy.setQpsLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); } @Test public void testHandle() throws Exception { String key = "svc.schema.opr"; - QpsController qpsController = new QpsController("key", 12); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy("key", 12L); Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.opr"); Mockito.when(invocation.getSchemaId()).thenReturn("schema"); Mockito.when(invocation.getMicroserviceName()).thenReturn("svc"); - setQpsController(key, qpsController); - new MockUp<QpsController>() { + setQpsController(key, qpsStrategy); + new MockUp<FixedWindowStrategy>() { @Mock public boolean isLimitNewRequest() { return true; @@ -93,8 +95,8 @@ public class TestConsumerQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return qpsController; + protected QpsStrategy create(String qualifiedNameKey) { + return qpsStrategy; } }; @@ -111,14 +113,14 @@ public class TestConsumerQpsFlowControlHandler { @Test public void testHandleIsLimitNewRequestAsFalse() throws Exception { String key = "service.schema.id"; - QpsController qpsController = new QpsController("service", 12); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy("service", 12L); Mockito.when(invocation.getMicroserviceName()).thenReturn("service"); Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta); Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.id"); - setQpsController(key, qpsController); + setQpsController(key, qpsStrategy); - new MockUp<QpsController>() { + new MockUp<QpsStrategy>() { @Mock public boolean isLimitNewRequest() { return false; @@ -128,8 +130,8 @@ public class TestConsumerQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return qpsController; + protected QpsStrategy create(String qualifiedNameKey) { + return qpsStrategy; } }; handler.handle(invocation, asyncResp); @@ -137,10 +139,10 @@ public class TestConsumerQpsFlowControlHandler { Mockito.verify(invocation).next(asyncResp); } - private void setQpsController(String key, QpsController qpsController) { + private void setQpsController(String key, QpsStrategy qpsStrategy) { QpsControllerManager qpsControllerManager = Deencapsulation.getField(handler, "qpsControllerMgr"); - ConcurrentHashMap<String, QpsController> objMap = Deencapsulation + ConcurrentHashMap<String, QpsStrategy> objMap = Deencapsulation .getField(qpsControllerManager, "qualifiedNameControllerMap"); - objMap.put(key, qpsController); + objMap.put(key, qpsStrategy); } } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java index d5a7267..6182007 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java @@ -26,6 +26,8 @@ import org.apache.servicecomb.core.Const; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.OperationMeta; import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; @@ -97,11 +99,11 @@ public class TestProviderQpsFlowControlHandler { @Test public void testQpsController() { - QpsController qpsController = new QpsController("abc", 100); - assertFalse(qpsController.isLimitNewRequest()); + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy("abc", 100L); + assertFalse(qpsStrategy.isLimitNewRequest()); - qpsController.setQpsLimit(1); - assertTrue(qpsController.isLimitNewRequest()); + qpsStrategy.setQpsLimit(1L); + assertTrue(qpsStrategy.isLimitNewRequest()); } @Test @@ -111,7 +113,7 @@ public class TestProviderQpsFlowControlHandler { Mockito.when(invocation.getHandlerIndex()).thenReturn(0); ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1); ProviderQpsFlowControlHandler.qpsControllerMgr - .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit"); + .setGlobalQpsStrategy(Config.PROVIDER_LIMIT_KEY_GLOBAL, Config.PROVIDER_BUCKET_KEY_GLOBAL); handler.handle(invocation, asyncResp); handler.handle(invocation, asyncResp); @@ -141,8 +143,8 @@ public class TestProviderQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 1); + protected QpsStrategy create(String qualifiedNameKey) { + return new FixedWindowStrategy(qualifiedNameKey, 1L); } }; @@ -168,8 +170,8 @@ public class TestProviderQpsFlowControlHandler { new MockUp<QpsControllerManager>() { @Mock - protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 1); + protected QpsStrategy create(String qualifiedNameKey) { + return new FixedWindowStrategy(qualifiedNameKey, 1L); } }; handler.handle(invocation, asyncResp); diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java new file mode 100644 index 0000000..75beb71 --- /dev/null +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.qps; + +import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy; +import org.apache.servicecomb.qps.strategy.FixedWindowStrategy; +import org.apache.servicecomb.qps.strategy.LeakyBucketStrategy; +import org.junit.Assert; +import org.junit.Test; + +/** + * @Author GuoYl123 + * @Date 2020/7/16 + **/ +public class TestQpsStrategy { + + @Test + public void testFixedWindowStrategy() { + AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy("abc", 100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); + + qpsStrategy.setQpsLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); + } + + + @Test + public void testLeakyBucketStrategy() { + LeakyBucketStrategy qpsStrategy = new LeakyBucketStrategy("abc", 100L); + Assert.assertEquals(false, qpsStrategy.isLimitNewRequest()); + + qpsStrategy.setQpsLimit(1L); + qpsStrategy.setBucketLimit(1L); + Assert.assertEquals(true, qpsStrategy.isLimitNewRequest()); + } + +} diff --git a/handlers/handler-router/src/main/java/org/apache/servicecomb/router/custom/RouterInvokeFilter.java b/handlers/handler-router/src/main/java/org/apache/servicecomb/router/custom/RouterInvokeFilter.java index 97858a9..7e345a9 100644 --- a/handlers/handler-router/src/main/java/org/apache/servicecomb/router/custom/RouterInvokeFilter.java +++ b/handlers/handler-router/src/main/java/org/apache/servicecomb/router/custom/RouterInvokeFilter.java @@ -109,7 +109,7 @@ public class RouterInvokeFilter implements HttpServerFilter { private boolean isHaveHeadersRule() { DynamicStringProperty headerStr = DynamicPropertyFactory.getInstance() .getStringProperty(SERVICECOMB_ROUTER_HEADER, null); - return !StringUtils.isEmpty(headerStr.get()); + return StringUtils.isNotEmpty(headerStr.get()); } private boolean addAllHeaders(String str) {
