This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new d6fc0a6bf [SCB-2650][SCB-2290]support instance isolation and instance
bulkhead (#3242)
d6fc0a6bf is described below
commit d6fc0a6bf28a28234fb6338ceafbd816c4c383b6
Author: liubao68 <[email protected]>
AuthorDate: Sun Jul 31 10:43:18 2022 +0800
[SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242)
---
.../src/main/resources/application.yml | 2 +-
.../src/main/resources/application.yml | 2 +-
.../main/resources/config/base/log4j.properties | 2 +-
.../governance/GovernanceConfiguration.java | 12 +++
.../handler/InstanceBulkheadHandler.java | 88 +++++++++++++++
.../handler/ext/AbstractFailurePredictor.java | 7 +-
.../governance/policy/CircuitBreakerPolicy.java | 18 +++-
.../servicecomb/governance/policy/RetryPolicy.java | 21 ++--
.../properties/InstanceBulkheadProperties.java | 19 ++--
.../governance/AbstractFailurePredictorTest.java | 64 +++++++++++
.../governance/GovernancePropertiesTest.java | 2 +-
.../governance/InstanceBulkheadHandlerTest.java | 119 +++++++++++++++++++++
governance/src/test/resources/application.yaml | 4 +
...r.java => ConsumerInstanceBulkheadHandler.java} | 68 +++---------
....java => ConsumerInstanceIsolationHandler.java} | 72 +++----------
.../governance/ProviderGovernanceHandler.java | 11 +-
.../src/main/resources/config/cse.handler.xml | 6 +-
17 files changed, 371 insertions(+), 146 deletions(-)
diff --git
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
index 1eb345f43..c663b708b 100644
---
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
+++
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
@@ -37,7 +37,7 @@ servicecomb:
handler:
chain:
Consumer:
- default: governance-consumer,loadbalance
+ default: loadbalance
Provider:
default: governance-provider
diff --git
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
index 7d4118f29..972f3efb2 100644
---
a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
+++
b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
@@ -33,6 +33,6 @@ servicecomb:
handler:
chain:
Consumer:
- default: governance-consumer,loadbalance
+ default: loadbalance
Provider:
default: governance-provider
\ No newline at end of file
diff --git
a/foundations/foundation-common/src/main/resources/config/base/log4j.properties
b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
index 83de4e475..0cdb3bfbd 100644
---
a/foundations/foundation-common/src/main/resources/config/base/log4j.properties
+++
b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
@@ -25,7 +25,7 @@ log4j.logger.runLogger=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%p] %m %l%n
+log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd
HH:mm:ss,SSS/zzz}][%t][%p]%m %l%n
log4j.appender.paas=org.apache.servicecomb.foundation.common.utils.RollingFileAppenderExt
log4j.appender.paas.file=${paas.logs.dir}${paas.logs.file}
diff --git
a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
index 03eb52156..9307da436 100644
---
a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
+++
b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.servicecomb.governance.handler.BulkheadHandler;
import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
import org.apache.servicecomb.governance.handler.FaultInjectionHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
import org.apache.servicecomb.governance.handler.RateLimitingHandler;
import org.apache.servicecomb.governance.handler.RetryHandler;
@@ -38,6 +39,7 @@ import
org.apache.servicecomb.governance.marker.operator.SuffixOperator;
import org.apache.servicecomb.governance.properties.BulkheadProperties;
import org.apache.servicecomb.governance.properties.CircuitBreakerProperties;
import org.apache.servicecomb.governance.properties.FaultInjectionProperties;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
import
org.apache.servicecomb.governance.properties.InstanceIsolationProperties;
import org.apache.servicecomb.governance.properties.MatchProperties;
import org.apache.servicecomb.governance.properties.RateLimitProperties;
@@ -58,6 +60,11 @@ public class GovernanceConfiguration {
return new BulkheadProperties();
}
+ @Bean
+ public InstanceBulkheadProperties instanceBulkheadProperties() {
+ return new InstanceBulkheadProperties();
+ }
+
@Bean
public CircuitBreakerProperties circuitBreakerProperties() {
return new CircuitBreakerProperties();
@@ -94,6 +101,11 @@ public class GovernanceConfiguration {
return new BulkheadHandler(bulkheadProperties);
}
+ @Bean
+ public InstanceBulkheadHandler
instanceBulkheadHandler(InstanceBulkheadProperties instanceBulkheadProperties) {
+ return new InstanceBulkheadHandler(instanceBulkheadProperties);
+ }
+
@Bean
public CircuitBreakerHandler circuitBreakerHandler(CircuitBreakerProperties
circuitBreakerProperties,
AbstractCircuitBreakerExtension circuitBreakerExtension,
diff --git
a/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
new file mode 100644
index 000000000..bdfe7d0b6
--- /dev/null
+++
b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance.handler;
+
+import java.time.Duration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadRegistry;
+
+public class InstanceBulkheadHandler extends
AbstractGovernanceHandler<Bulkhead, BulkheadPolicy> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceBulkheadHandler.class);
+
+ private final InstanceBulkheadProperties bulkheadProperties;
+
+ public InstanceBulkheadHandler(InstanceBulkheadProperties
bulkheadProperties) {
+ this.bulkheadProperties = bulkheadProperties;
+ }
+
+ @Override
+ protected String createKey(GovernanceRequest governanceRequest,
BulkheadPolicy policy) {
+ return InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY
+ + "." + governanceRequest.getServiceName()
+ + "." + policy.getName()
+ + "." + governanceRequest.getInstanceId();
+ }
+
+ @Override
+ protected void onConfigurationChanged(String key) {
+ if
(key.startsWith(InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY)) {
+ for (String processorKey : processors.keySet()) {
+ if (processorKey.startsWith(key)) {
+ processors.remove(processorKey);
+ }
+ }
+ }
+ }
+
+ @Override
+ public BulkheadPolicy matchPolicy(GovernanceRequest governanceRequest) {
+ if (StringUtils.isEmpty(governanceRequest.getServiceName()) ||
StringUtils.isEmpty(
+ governanceRequest.getInstanceId())) {
+ LOGGER.info("Instance bulkhead is not properly configured, service id or
instance id is empty.");
+ return null;
+ }
+ return matchersManager.match(governanceRequest,
bulkheadProperties.getParsedEntity());
+ }
+
+ @Override
+ public Bulkhead createProcessor(String key, GovernanceRequest
governanceRequest, BulkheadPolicy policy) {
+ return getBulkhead(key, policy);
+ }
+
+ private Bulkhead getBulkhead(String key, BulkheadPolicy policy) {
+ LOGGER.info("applying new policy {} for {}", key, policy.toString());
+
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(policy.getMaxConcurrentCalls())
+ .maxWaitDuration(Duration.parse(policy.getMaxWaitDuration()))
+ .build();
+
+ BulkheadRegistry registry = BulkheadRegistry.of(config);
+
+ return registry.bulkhead(key, config);
+ }
+}
diff --git
a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
index 8af8ae68f..e7fea24f9 100644
---
a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
+++
b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
@@ -38,12 +38,15 @@ public abstract class AbstractFailurePredictor implements
FailurePredictor {
}
private static boolean statusCodeMatch(String status, String responseStatus)
{
- if (3 != status.length()) {
+ if (status == null) {
+ return false;
+ }
+ if (responseStatus.length() != status.length()) {
return false;
}
char[] statusChar = status.toCharArray();
char[] responseChar = responseStatus.toCharArray();
- return IntStream.range(0, 3).noneMatch(i ->
+ return IntStream.range(0, statusChar.length).noneMatch(i ->
statusChar[i] != responseChar[i] && statusChar[i] != 'x');
}
}
diff --git
a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
index 9996d1b72..1d1bdfeec 100644
---
a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
+++
b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
@@ -17,8 +17,9 @@
package org.apache.servicecomb.governance.policy;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.governance.utils.GovernanceUtils;
@@ -48,6 +49,9 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
public static final String DEFAULT_FAILURE_RESPONSE_STATUS_503 = "503";
+ public static final List<String> DEFAULT_STATUS_LIST =
Arrays.asList(DEFAULT_FAILURE_RESPONSE_STATUS_502,
+ DEFAULT_FAILURE_RESPONSE_STATUS_503);
+
private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD;
private float slowCallRateThreshold = DEFAULT_SLOW_CALL_RATE_THRESHOLD;
@@ -65,7 +69,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
private String slidingWindowSize = DEFAULT_SLIDING_WINDOW_SIZE;
//status code that need record as a failure
- private List<String> recordFailureStatus = new ArrayList<>();
+ private List<String> recordFailureStatus = DEFAULT_STATUS_LIST;
//force close this circuit breaker. This parameter is not used by circuit
breaker directly
private boolean forceClosed = false;
@@ -193,14 +197,17 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
public List<String> getRecordFailureStatus() {
if (CollectionUtils.isEmpty(this.recordFailureStatus)) {
- this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_502);
- this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_503);
+ return DEFAULT_STATUS_LIST;
}
return this.recordFailureStatus;
}
public void setRecordFailureStatus(List<String> recordFailureStatus) {
- this.recordFailureStatus = recordFailureStatus;
+ if (recordFailureStatus == null) {
+ return;
+ }
+ this.recordFailureStatus = recordFailureStatus.stream().filter(e ->
!StringUtils.isEmpty(e))
+ .collect(Collectors.toList());
}
public boolean isForceClosed() {
@@ -230,6 +237,7 @@ public class CircuitBreakerPolicy extends AbstractPolicy {
", minimumNumberOfCalls=" + minimumNumberOfCalls +
", slidingWindowType='" + slidingWindowType + '\'' +
", slidingWindowSize=" + slidingWindowSize +
+ ", recordFailureStatus=" + recordFailureStatus +
'}';
}
}
diff --git
a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
index 95be9194f..726121a2c 100644
---
a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
+++
b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
@@ -17,8 +17,9 @@
package org.apache.servicecomb.governance.policy;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
@@ -27,12 +28,15 @@ public class RetryPolicy extends AbstractPolicy {
public static final int DEFAULT_MAX_ATTEMPTS = 3;
- public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(10);
+ public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(1);
public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_502 = "502";
public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_503 = "503";
+ public static final List<String> DEFAULT_STATUS_LIST =
Arrays.asList(DEFAULT_RETRY_ON_RESPONSE_STATUS_502,
+ DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+
private static final Duration INITIAL_INTERVAL = Duration.ofMillis(1000);
private static final float MULTIPLIER = 2;
@@ -48,7 +52,7 @@ public class RetryPolicy extends AbstractPolicy {
private String waitDuration = DEFAULT_WAIT_DURATION.toString();
//status code that need retry
- private List<String> retryOnResponseStatus = new ArrayList<>();
+ private List<String> retryOnResponseStatus = DEFAULT_STATUS_LIST;
//retry strategy
private String retryStrategy = DEFAULT_RETRY_STRATEGY;
@@ -71,14 +75,17 @@ public class RetryPolicy extends AbstractPolicy {
public List<String> getRetryOnResponseStatus() {
if (CollectionUtils.isEmpty(retryOnResponseStatus)) {
- this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_502);
- this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+ return DEFAULT_STATUS_LIST;
}
return retryOnResponseStatus;
}
public void setRetryOnResponseStatus(List<String> retryOnResponseStatus) {
- this.retryOnResponseStatus = retryOnResponseStatus;
+ if (retryOnResponseStatus == null) {
+ return;
+ }
+ this.retryOnResponseStatus = retryOnResponseStatus.stream().filter(e ->
!StringUtils.isEmpty(e))
+ .collect(Collectors.toList());
}
public int getMaxAttempts() {
@@ -90,7 +97,7 @@ public class RetryPolicy extends AbstractPolicy {
}
public String getWaitDuration() {
- return Duration.parse(waitDuration).toMillis() < 10 ?
DEFAULT_WAIT_DURATION.toString() : waitDuration;
+ return Duration.parse(waitDuration).toMillis() < 1 ?
DEFAULT_WAIT_DURATION.toString() : waitDuration;
}
public void setWaitDuration(String waitDuration) {
diff --git
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
similarity index 64%
rename from
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
rename to
governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
index f5cfbf72a..edd4b6045 100644
---
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
+++
b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.governance.properties;
-import org.apache.servicecomb.core.Handler;
-import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+
+public class InstanceBulkheadProperties extends
PolicyProperties<BulkheadPolicy> {
+ public static final String MATCH_INSTANCE_BULKHEAD_KEY =
"servicecomb.instanceBulkhead";
+
+ public InstanceBulkheadProperties() {
+ super(MATCH_INSTANCE_BULKHEAD_KEY);
+ }
-public class ConsumerGovernanceHandler implements Handler {
- // an empty implementation, will add possible features in future.
@Override
- public void handle(Invocation invocation, AsyncResponse asyncResp) throws
Exception {
- invocation.next(asyncResp);
+ public Class<BulkheadPolicy> getEntityClass() {
+ return BulkheadPolicy.class;
}
}
diff --git
a/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
new file mode 100644
index 000000000..9c84287ac
--- /dev/null
+++
b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.servicecomb.governance.handler.ext.AbstractFailurePredictor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractFailurePredictorTest {
+ class MyAbstractFailurePredictor extends AbstractFailurePredictor {
+ MyAbstractFailurePredictor() {
+ }
+
+ @Override
+ protected String extractStatusCode(Object result) {
+ return (String) result;
+ }
+
+ @Override
+ public boolean isFailedResult(Throwable e) {
+ return super.isFailedResult(e);
+ }
+ }
+
+ @Test
+ public void testCodeMatch() {
+ AbstractFailurePredictor predictor = new MyAbstractFailurePredictor();
+ List<String> statusList = Arrays.asList("500");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+ statusList = Arrays.asList("5x0");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+ statusList = Arrays.asList(null, "xx", "5x0");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+ }
+}
diff --git
a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
index bc943a070..e0c55964c 100644
---
a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
+++
b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
@@ -164,7 +164,7 @@ public class GovernancePropertiesTest {
@Test
public void test_all_bean_is_loaded() {
- Assertions.assertEquals(6, propertiesList.size());
+ Assertions.assertEquals(7, propertiesList.size());
}
@Test
diff --git
a/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
new file mode 100644
index 000000000..2377aabc3
--- /dev/null
+++
b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier;
+
+@SpringBootTest
+@ContextConfiguration(classes = {GovernanceConfiguration.class,
MockConfiguration.class})
+public class InstanceBulkheadHandlerTest {
+ private InstanceBulkheadHandler instanceBulkheadHandler;
+
+ @Autowired
+ public void setInstanceBulkheadHandler(InstanceBulkheadHandler
instanceBulkheadHandler) {
+ this.instanceBulkheadHandler = instanceBulkheadHandler;
+ }
+
+ @Test
+ public void test_instance_bulkhead_work() throws Throwable {
+
+ // instance1
+ DecorateCheckedSupplier<String> dsInstance1 =
Decorators.ofCheckedSupplier(() -> "wake");
+
+ GovernanceRequest requestInstance1 = new GovernanceRequest();
+ requestInstance1.setInstanceId("instance01");
+ requestInstance1.setServiceName("service01");
+ requestInstance1.setUri("/test");
+
+ Bulkhead bulkheadInstance1 =
instanceBulkheadHandler.getActuator(requestInstance1);
+ dsInstance1.withBulkhead(bulkheadInstance1);
+
+ // instance2
+ DecorateCheckedSupplier<String> dsInstance2 =
Decorators.ofCheckedSupplier(() -> {
+ Thread.sleep(1000);
+ return "sleep";
+ });
+
+ GovernanceRequest requestInstance2 = new GovernanceRequest();
+ requestInstance2.setInstanceId("instance02");
+ requestInstance2.setServiceName("service01");
+ requestInstance2.setUri("/test");
+
+ Bulkhead bulkheadInstance2 =
instanceBulkheadHandler.getActuator(requestInstance2);
+ dsInstance2.withBulkhead(bulkheadInstance2);
+
+ Executor executor = Executors.newFixedThreadPool(4);
+ AtomicInteger wakeCount = new AtomicInteger(0);
+ AtomicInteger sleepCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+ AtomicInteger rejectCount = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(100);
+ for (int i = 0; i < 100; i++) {
+ final int num = i;
+ executor.execute(() -> {
+ // 50% for each server
+ if (num % 2 == 0) {
+ runCommand(dsInstance1, wakeCount, sleepCount, errorCount,
rejectCount, countDownLatch);
+ } else {
+ runCommand(dsInstance2, wakeCount, sleepCount, errorCount,
rejectCount, countDownLatch);
+ }
+ });
+ }
+ countDownLatch.await(100, TimeUnit.SECONDS);
+ Assertions.assertEquals(50, wakeCount.get());
+ Assertions.assertEquals(2, sleepCount.get());
+ Assertions.assertEquals(0, errorCount.get());
+ Assertions.assertEquals(48, rejectCount.get());
+ }
+
+ private void runCommand(DecorateCheckedSupplier<String> ds, AtomicInteger
wakeCount, AtomicInteger sleepCount,
+ AtomicInteger errorCount, AtomicInteger rejectCount, CountDownLatch
countDownLatch) {
+ try {
+ String result = ds.get();
+ if ("wake".equals(result)) {
+ wakeCount.incrementAndGet();
+ } else if ("sleep".equals(result)) {
+ sleepCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ } catch (BulkheadFullException e) {
+ rejectCount.incrementAndGet();
+ } catch (Throwable e) {
+ errorCount.incrementAndGet();
+ }
+ countDownLatch.countDown();
+ }
+}
diff --git a/governance/src/test/resources/application.yaml
b/governance/src/test/resources/application.yaml
index e5c0842e6..5a98ad620 100644
--- a/governance/src/test/resources/application.yaml
+++ b/governance/src/test/resources/application.yaml
@@ -122,6 +122,10 @@ servicecomb:
slidingWindowSize: 2
slidingWindowType: COUNT_BASED
waitDurationInOpenState: 1000
+ instanceBulkhead:
+ demo-allOperation: |
+ maxConcurrentCalls: 2
+ maxWaitDuration: 10
faultInjection:
demo-fallback-ThrowException: |
type: abort
diff --git
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
similarity index 53%
copy from
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
copy to
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
index 8ae3df80a..65e43465c 100644
---
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
@@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.governance.MatchType;
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
-import org.apache.servicecomb.governance.handler.BulkheadHandler;
-import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
-import org.apache.servicecomb.governance.handler.RateLimitingHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
import org.apache.servicecomb.governance.marker.GovernanceRequest;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
@@ -38,31 +36,26 @@ import org.slf4j.LoggerFactory;
import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
-import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
-import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
-import io.github.resilience4j.ratelimiter.RateLimiter;
-import io.github.resilience4j.ratelimiter.RequestNotPermitted;
-public class ProviderGovernanceHandler implements Handler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProviderGovernanceHandler.class);
+public class ConsumerInstanceBulkheadHandler implements Handler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerInstanceBulkheadHandler.class);
- private final RateLimitingHandler rateLimitingHandler =
BeanUtils.getBean(RateLimitingHandler.class);
-
- private final CircuitBreakerHandler circuitBreakerHandler =
BeanUtils.getBean(CircuitBreakerHandler.class);
-
- private final BulkheadHandler bulkheadHandler =
BeanUtils.getBean(BulkheadHandler.class);
+ private final InstanceBulkheadHandler instanceBulkheadHandler =
BeanUtils.getBean(InstanceBulkheadHandler.class);
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws
Exception {
-
+ if (invocation.getEndpoint() == null) {
+ invocation.next(asyncResp);
+ return;
+ }
Supplier<CompletionStage<Response>> next =
createBusinessCompletionStageSupplier(invocation);
DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+ request.setServiceName(invocation.getMicroserviceName());
+
request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
- addRateLimiting(dcs, request);
- addCircuitBreaker(dcs, request);
addBulkhead(dcs, request);
dcs.get().whenComplete((r, e) -> {
@@ -71,18 +64,10 @@ public class ProviderGovernanceHandler implements Handler {
return;
}
- if (e instanceof RequestNotPermitted) {
- asyncResp.complete(
- Response.failResp(new InvocationException(429, "rate limited.",
new CommonExceptionData("rate limited."))));
- LOGGER.warn("the request is rate limit by policy : {}",
e.getMessage());
- } else if (e instanceof CallNotPermittedException) {
- asyncResp.complete(
- Response.failResp(new InvocationException(429, "circuitBreaker is
open.",
- new CommonExceptionData("circuitBreaker is open."))));
- LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage());
- } else if (e instanceof BulkheadFullException) {
+ if (e instanceof BulkheadFullException) {
+ // return 503 so that consumer can retry
asyncResp.complete(
- Response.failResp(new InvocationException(429, "bulkhead is full
and does not permit further calls.",
+ Response.failResp(new InvocationException(503, "bulkhead is full
and does not permit further calls.",
new CommonExceptionData("bulkhead is full and does not permit
further calls."))));
LOGGER.warn("bulkhead is full and does not permit further calls by
policy : {}", e.getMessage());
} else {
@@ -92,40 +77,17 @@ public class ProviderGovernanceHandler implements Handler {
}
private void addBulkhead(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- Bulkhead bulkhead = bulkheadHandler.getActuator(request);
+ Bulkhead bulkhead = instanceBulkheadHandler.getActuator(request);
if (bulkhead != null) {
dcs.withBulkhead(bulkhead);
}
}
- private void addCircuitBreaker(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request);
- if (circuitBreaker != null) {
- dcs.withCircuitBreaker(circuitBreaker);
- }
- }
-
- private void addRateLimiting(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- RateLimiter rateLimiter = rateLimitingHandler.getActuator(request);
- if (rateLimiter != null) {
- dcs.withRateLimiter(rateLimiter);
- }
- }
-
private Supplier<CompletionStage<Response>>
createBusinessCompletionStageSupplier(Invocation invocation) {
return () -> {
CompletableFuture<Response> result = new CompletableFuture<>();
try {
- invocation.next(response -> {
- if (response.isFailed()) {
- // For failed response, create a fail to make circuit breaker work.
- // Users application maybe much complicated than this simple logic,
- // while they need to customize which error will cause circuit
breaker.
- result.completeExceptionally(response.getResult());
- } else {
- result.complete(response);
- }
- });
+ invocation.next(result::complete);
} catch (Exception e) {
result.completeExceptionally(e);
}
diff --git
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
similarity index 50%
copy from
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
copy to
handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
index 8ae3df80a..73d3dbb1d 100644
---
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -25,9 +25,7 @@ import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.governance.MatchType;
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
-import org.apache.servicecomb.governance.handler.BulkheadHandler;
-import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
-import org.apache.servicecomb.governance.handler.RateLimitingHandler;
+import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
import org.apache.servicecomb.governance.marker.GovernanceRequest;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
@@ -36,34 +34,29 @@ import
org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.github.resilience4j.bulkhead.Bulkhead;
-import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.decorators.Decorators;
import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
-import io.github.resilience4j.ratelimiter.RateLimiter;
-import io.github.resilience4j.ratelimiter.RequestNotPermitted;
-public class ProviderGovernanceHandler implements Handler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ProviderGovernanceHandler.class);
+public class ConsumerInstanceIsolationHandler implements Handler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerInstanceIsolationHandler.class);
- private final RateLimitingHandler rateLimitingHandler =
BeanUtils.getBean(RateLimitingHandler.class);
-
- private final CircuitBreakerHandler circuitBreakerHandler =
BeanUtils.getBean(CircuitBreakerHandler.class);
-
- private final BulkheadHandler bulkheadHandler =
BeanUtils.getBean(BulkheadHandler.class);
+ private final InstanceIsolationHandler instanceIsolationHandler =
BeanUtils.getBean(InstanceIsolationHandler.class);
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws
Exception {
-
+ if (invocation.getEndpoint() == null) {
+ invocation.next(asyncResp);
+ return;
+ }
Supplier<CompletionStage<Response>> next =
createBusinessCompletionStageSupplier(invocation);
DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+ request.setServiceName(invocation.getMicroserviceName());
+
request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
- addRateLimiting(dcs, request);
addCircuitBreaker(dcs, request);
- addBulkhead(dcs, request);
dcs.get().whenComplete((r, e) -> {
if (e == null) {
@@ -71,61 +64,30 @@ public class ProviderGovernanceHandler implements Handler {
return;
}
- if (e instanceof RequestNotPermitted) {
- asyncResp.complete(
- Response.failResp(new InvocationException(429, "rate limited.",
new CommonExceptionData("rate limited."))));
- LOGGER.warn("the request is rate limit by policy : {}",
e.getMessage());
- } else if (e instanceof CallNotPermittedException) {
- asyncResp.complete(
- Response.failResp(new InvocationException(429, "circuitBreaker is
open.",
- new CommonExceptionData("circuitBreaker is open."))));
- LOGGER.warn("circuitBreaker is open by policy : {}", e.getMessage());
- } else if (e instanceof BulkheadFullException) {
+ if (e instanceof CallNotPermittedException) {
+ // return 503 so that consumer can retry
asyncResp.complete(
- Response.failResp(new InvocationException(429, "bulkhead is full
and does not permit further calls.",
- new CommonExceptionData("bulkhead is full and does not permit
further calls."))));
- LOGGER.warn("bulkhead is full and does not permit further calls by
policy : {}", e.getMessage());
+ Response.failResp(new InvocationException(503, "instance isolation
circuitBreaker is open.",
+ new CommonExceptionData("instance isolation circuitBreaker is
open."))));
+ LOGGER.warn("instance isolation circuitBreaker is open by policy :
{}", e.getMessage());
} else {
asyncResp.complete(Response.createProducerFail(e));
}
});
}
- private void addBulkhead(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- Bulkhead bulkhead = bulkheadHandler.getActuator(request);
- if (bulkhead != null) {
- dcs.withBulkhead(bulkhead);
- }
- }
-
private void addCircuitBreaker(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- CircuitBreaker circuitBreaker = circuitBreakerHandler.getActuator(request);
+ CircuitBreaker circuitBreaker =
instanceIsolationHandler.getActuator(request);
if (circuitBreaker != null) {
dcs.withCircuitBreaker(circuitBreaker);
}
}
- private void addRateLimiting(DecorateCompletionStage<Response> dcs,
GovernanceRequest request) {
- RateLimiter rateLimiter = rateLimitingHandler.getActuator(request);
- if (rateLimiter != null) {
- dcs.withRateLimiter(rateLimiter);
- }
- }
-
private Supplier<CompletionStage<Response>>
createBusinessCompletionStageSupplier(Invocation invocation) {
return () -> {
CompletableFuture<Response> result = new CompletableFuture<>();
try {
- invocation.next(response -> {
- if (response.isFailed()) {
- // For failed response, create a fail to make circuit breaker work.
- // Users application maybe much complicated than this simple logic,
- // while they need to customize which error will cause circuit
breaker.
- result.completeExceptionally(response.getResult());
- } else {
- result.complete(response);
- }
- });
+ invocation.next(result::complete);
} catch (Exception e) {
result.completeExceptionally(e);
}
diff --git
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
index 8ae3df80a..f3142ab77 100644
---
a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++
b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
@@ -116,16 +116,7 @@ public class ProviderGovernanceHandler implements Handler {
return () -> {
CompletableFuture<Response> result = new CompletableFuture<>();
try {
- invocation.next(response -> {
- if (response.isFailed()) {
- // For failed response, create a fail to make circuit breaker work.
- // Users application maybe much complicated than this simple logic,
- // while they need to customize which error will cause circuit
breaker.
- result.completeExceptionally(response.getResult());
- } else {
- result.complete(response);
- }
- });
+ invocation.next(result::complete);
} catch (Exception e) {
result.completeExceptionally(e);
}
diff --git
a/handlers/handler-governance/src/main/resources/config/cse.handler.xml
b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
index c7383f0cc..ab5a82c6f 100644
--- a/handlers/handler-governance/src/main/resources/config/cse.handler.xml
+++ b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
@@ -18,6 +18,8 @@
<config>
<handler id="governance-provider"
class="org.apache.servicecomb.handler.governance.ProviderGovernanceHandler"/>
- <handler id="governance-consumer"
-
class="org.apache.servicecomb.handler.governance.ConsumerGovernanceHandler"/>
+ <handler id="instance-bulkhead-consumer"
+
class="org.apache.servicecomb.handler.governance.ConsumerInstanceBulkheadHandler"/>
+ <handler id="instance-isolation-consumer"
+
class="org.apache.servicecomb.handler.governance.ConsumerInstanceIsolationHandler"/>
</config>