[
https://issues.apache.org/jira/browse/SCB-1056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711228#comment-16711228
]
ASF GitHub Bot commented on SCB-1056:
-------------------------------------
liubao68 closed pull request #1026: [SCB-1056] put provider QPS flow control in
front, for highway transport
URL: https://github.com/apache/servicecomb-java-chassis/pull/1026
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 20e8e0828..32bfb67b9 100644
---
a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++
b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -20,12 +20,14 @@
import java.util.Map;
import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
@@ -181,9 +183,38 @@ public void execute() {
null);
invocation.onStart(null, start);
invocation.getInvocationStageTrace().startSchedule();
- operationMeta.getExecutor().execute(() -> runInExecutor());
+
+ // copied from HighwayCodec#decodeRequest()
+ // for temporary qps enhance purpose, we'll remove it when handler
mechanism is refactored
+ invocation.mergeContext(header.getContext());
+
+ Holder<Boolean> qpsFlowControlReject =
checkQpsFlowControl(operationMeta);
+ if (qpsFlowControlReject.value) {
+ return;
+ }
+
+ operationMeta.getExecutor().execute(this::runInExecutor);
} catch (IllegalStateException e) {
sendResponse(header.getContext(), Response.providerFailResp(e));
}
}
+
+ private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+ Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+ @SuppressWarnings("deprecation")
+ Handler providerQpsFlowControlHandler =
operationMeta.getProviderQpsFlowControlHandler();
+ if (null != providerQpsFlowControlHandler) {
+ try {
+ providerQpsFlowControlHandler.handle(invocation, response -> {
+ qpsFlowControlReject.value = true;
+ sendResponse(header.getContext(), response);
+ });
+ } catch (Exception e) {
+ LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+ qpsFlowControlReject.value = true;
+ sendResponse(header.getContext(), Response.providerFailResp(e));
+ }
+ }
+ return qpsFlowControlReject;
+ }
}
diff --git
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index a9932e2a1..ad3feddc8 100644
---
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -37,7 +37,6 @@
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory;
import org.apache.servicecomb.swagger.invocation.Response;
-import org.apache.servicecomb.swagger.invocation.context.InvocationContext;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
import org.apache.servicecomb.transport.highway.message.ResponseHeader;
import org.junit.After;
@@ -81,7 +80,7 @@ public static void setupClass() {
}
@Before
- public void setUp() throws Exception {
+ public void setUp() {
ServiceRegistry serviceRegistry = ServiceRegistryFactory.createLocal();
serviceRegistry.init();
RegistryUtils.setServiceRegistry(serviceRegistry);
@@ -106,7 +105,7 @@ public void setUp() throws Exception {
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
header = null;
diff --git
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index 7381dee3e..b4e202b95 100644
---
a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++
b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,8 +17,13 @@
package org.apache.servicecomb.transport.highway;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
import javax.xml.ws.Holder;
+import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.SCBStatus;
import org.apache.servicecomb.core.definition.OperationMeta;
@@ -28,7 +33,11 @@
import org.apache.servicecomb.core.executor.ReactiveExecutor;
import org.apache.servicecomb.core.unittest.UnitTestMeta;
import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.apache.servicecomb.transport.common.MockUtil;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
import org.junit.AfterClass;
@@ -44,6 +53,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
+import mockit.Deencapsulation;
import mockit.Mock;
import mockit.MockUp;
@@ -54,7 +64,7 @@ public int add(int x, int y) {
}
}
- private UnitTestMeta unitTestMeta;
+ private static UnitTestMeta unitTestMeta;
private ByteBuf netSocketBuffer;
@@ -70,6 +80,7 @@ public int add(int x, int y) {
public static void classSetup() {
EventManager.eventBus = new EventBus();
SCBEngine.getInstance().setStatus(SCBStatus.UP);
+ unitTestMeta = new UnitTestMeta();
new MockUp<System>() {
@Mock
@@ -87,7 +98,6 @@ public static void classTeardown() {
@Before
public void setup() {
- unitTestMeta = new UnitTestMeta();
socketAddress = new MockUp<SocketAddress>() {
@Mock
public String host() {
@@ -153,22 +163,55 @@ public void onFinish(InvocationFinishEvent event) {
requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
requestHeader.setSchemaId(schemaMeta.getSchemaId());
requestHeader.setOperationName(operationMeta.getOperationId());
- Assert.assertTrue(highwayServerInvoke.init(connection, 0, requestHeader,
null));
+ assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
// exe失败
MockUtil.getInstance().decodeRequestSucc = false;
highwayServerInvoke.execute();
EventManager.unregister(subscriber);
- Assert.assertEquals(true,
Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
- Assert.assertSame(highwayServerInvoke.invocation,
startHolder.value.getInvocation());
- Assert.assertSame(highwayServerInvoke.invocation,
finishHolder.value.getInvocation());
-
Assert.assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution()
!= 0);
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
- Assert.assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+
assertTrue(Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+ assertSame(highwayServerInvoke.invocation,
startHolder.value.getInvocation());
+ assertSame(highwayServerInvoke.invocation,
finishHolder.value.getInvocation());
+
assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution()
!= 0);
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+ assertEquals(1,
highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ }
+
+ @Test
+ public void testFlowControlQps() {
+ MockUtil.getInstance().mockHighwayCodec();
+ SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
+ OperationMeta operationMeta = schemaMeta.ensureFindOperation("add");
+ operationMeta.setExecutor(new ReactiveExecutor());
+ Deencapsulation.setField(operationMeta,
"providerQpsFlowControlHandlerSearched", true);
+ Deencapsulation.setField(operationMeta, "providerQpsFlowControlHandler",
+ (Handler) (invocation, asyncResp) -> asyncResp.producerFail(new
InvocationException(
+ new HttpStatus(429, "Too Many Requests"),
+ new CommonExceptionData("rejected by qps flowcontrol"))));
+
+ RequestHeader requestHeader = MockUtil.getInstance().requestHeader;
+ requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
+ requestHeader.setSchemaId(schemaMeta.getSchemaId());
+ requestHeader.setOperationName(operationMeta.getOperationId());
+
+ HighwayServerInvoke highwayServerInvoke = new HighwayServerInvoke();
+
+ assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
+ MockUtil.getInstance().decodeRequestSucc = true;
+
+ ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.enabled",
"true");
+
ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit",
"1");
+
+ highwayServerInvoke.execute();
+ String bodyString = Buffer.buffer(netSocketBuffer).toString();
+ assertTrue(bodyString.contains("Too Many Requests"));
+ assertTrue(bodyString.contains("rejected by qps flowcontrol"));
+
+ ArchaiusUtils.resetConfig();
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Put provider QPS flow control in front
> --------------------------------------
>
> Key: SCB-1056
> URL: https://issues.apache.org/jira/browse/SCB-1056
> Project: Apache ServiceComb
> Issue Type: Improvement
> Components: Java-Chassis
> Reporter: YaoHaishi
> Assignee: YaoHaishi
> Priority: Major
> Fix For: java-chassis-1.2.0
>
>
> Currently provider QPS flow control is in ProviderQpsFlowControlHandler which
> works in provider handler chain. As a result, the flow control logic takes
> effect too late and much CPU resource is wasted on processing those requests
> that should be rejected earlier.
> Put the provider QPS flow control logic in front can save the resource.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)