[ https://issues.apache.org/jira/browse/SCB-1056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708727#comment-16708727 ]
ASF GitHub Bot commented on SCB-1056: ------------------------------------- liubao68 closed pull request #1017: [SCB-1056] put provider flow control logic in front URL: https://github.com/apache/servicecomb-java-chassis/pull/1017 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java index 932aef4c3..d2b08aa05 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java @@ -26,6 +26,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; +import javax.xml.ws.Holder; import org.apache.commons.lang3.StringUtils; import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor; @@ -36,6 +37,7 @@ import org.apache.servicecomb.common.rest.locator.OperationLocator; import org.apache.servicecomb.common.rest.locator.ServicePathManager; import org.apache.servicecomb.core.Const; +import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.definition.MicroserviceMeta; import org.apache.servicecomb.core.definition.OperationMeta; @@ -126,6 +128,19 @@ protected void scheduleInvocation() { invocation.getInvocationStageTrace().startSchedule(); OperationMeta operationMeta = restOperationMeta.getOperationMeta(); + try { + this.setContext(); + } catch (Exception e) { + LOGGER.error("failed to set invocation context", e); + sendFailResponse(e); + return; + } + + Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta); + if (qpsFlowControlReject.value) { + return; + } + operationMeta.getExecutor().execute(() -> { synchronized (this.requestEx) { try { @@ -150,6 +165,26 @@ protected void scheduleInvocation() { }); } + private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) { + Holder<Boolean> qpsFlowControlReject = new Holder<>(false); + @SuppressWarnings("deprecation") + Handler providerQpsFlowControlHandler = operationMeta.getProviderQpsFlowControlHandler(); + if (null != providerQpsFlowControlHandler) { + try { + providerQpsFlowControlHandler.handle(invocation, response -> { + qpsFlowControlReject.value = true; + produceProcessor = ProduceProcessorManager.JSON_PROCESSOR; + sendResponse(response); + }); + } catch (Exception e) { + LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e); + qpsFlowControlReject.value = true; + sendFailResponse(e); + } + } + return qpsFlowControlReject; + } + private boolean isInQueueTimeout() { return System.nanoTime() - invocation.getInvocationStageTrace().getStart() > CommonRestConfig.getRequestWaitInPoolTimeout() * 1_000_000; @@ -183,7 +218,6 @@ public void invoke() { protected Response prepareInvoke() throws Throwable { this.initProduceProcessor(); - this.setContext(); invocation.getHandlerContext().put(RestConst.REST_REQUEST, requestEx); invocation.getInvocationStageTrace().startServerFiltersRequest(); @@ -201,9 +235,7 @@ protected Response prepareInvoke() throws Throwable { protected void doInvoke() throws Throwable { invocation.getInvocationStageTrace().startHandlersRequest(); - invocation.next(resp -> { - sendResponseQuietly(resp); - }); + invocation.next(resp -> sendResponseQuietly(resp)); } public void sendFailResponse(Throwable throwable) { diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java index faf37a539..5a8c85962 100644 --- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java @@ -17,6 +17,8 @@ package org.apache.servicecomb.common.rest; +import static org.junit.Assert.assertEquals; + import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -49,10 +51,11 @@ import org.apache.servicecomb.foundation.common.utils.JsonUtils; import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils; import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest; +import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletResponse; import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx; import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx; -import org.apache.servicecomb.swagger.invocation.AsyncResponse; import org.apache.servicecomb.swagger.invocation.Response; +import org.apache.servicecomb.swagger.invocation.context.HttpStatus; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.apache.servicecomb.swagger.invocation.response.Headers; @@ -188,8 +191,8 @@ public void sendFailResponse(Throwable throwable) { restInvocation.initProduceProcessor(); Assert.fail("must throw exception"); } catch (InvocationException e) { - Assert.assertEquals(Status.NOT_ACCEPTABLE, e.getStatus()); - Assert.assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage()); + assertEquals(Status.NOT_ACCEPTABLE, e.getStatus()); + assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage()); } } @@ -253,7 +256,7 @@ public void setContextNormal() throws Exception { @Test public void getContext() { invocation.addContext("key", "test"); - Assert.assertEquals("test", restInvocation.getContext("key")); + assertEquals("test", restInvocation.getContext("key")); } @Test @@ -401,7 +404,7 @@ public void sendFailResponse(Throwable throwable) { restInvocation.invoke(); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest()); } @Test @@ -571,7 +574,7 @@ void setContentType(String type) { initRestInvocation(); restInvocation.sendResponse(response); - Assert.assertEquals(expected, result); + assertEquals(expected, result); } @Test @@ -658,7 +661,7 @@ void addHeader(String name, String value) { restInvocation.sendResponse(response); Assert.fail("must throw exception"); } catch (Error e) { - Assert.assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap()); + assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap()); } } @@ -693,8 +696,8 @@ void setBodyBuffer(Buffer bodyBuffer) { initRestInvocation(); restInvocation.sendResponse(response); - Assert.assertEquals("\"ok\"", buffer.toString()); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse()); + assertEquals("\"ok\"", buffer.toString()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse()); } @Test @@ -747,7 +750,7 @@ public void beforeSendResponse(Invocation invocation, HttpServletResponseEx resp restInvocation.setHttpServerFilters(httpServerFilters); restInvocation.sendResponse(response); - Assert.assertEquals("\"ok\"-filter", buffer.toString()); + assertEquals("\"ok\"-filter", buffer.toString()); } @Test @@ -905,10 +908,10 @@ protected void runOnExecutor() { EventManager.unregister(subscriber); Assert.assertTrue(result.value); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart()); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule()); Assert.assertSame(invocation, eventHolder.value.getInvocation()); - Assert.assertEquals("tid", invocation.getTraceId()); + assertEquals("tid", invocation.getTraceId()); } @Test @@ -936,19 +939,14 @@ public void invoke() { Assert.assertTrue(result.value); Assert.assertSame(invocation, restInvocation.invocation); - Assert.assertEquals(time, invocation.getInvocationStageTrace().getStartExecution()); + assertEquals(time, invocation.getInvocationStageTrace().getStartExecution()); } @Test public void doInvoke(@Mocked Endpoint endpoint, @Mocked OperationMeta operationMeta, @Mocked Object[] swaggerArguments, @Mocked SchemaMeta schemaMeta) throws Throwable { Response response = Response.ok("ok"); - Handler handler = new Handler() { - @Override - public void handle(Invocation invocation, AsyncResponse asyncResp) { - asyncResp.complete(response); - } - }; + Handler handler = (invocation, asyncResp) -> asyncResp.complete(response); List<Handler> handlerChain = Arrays.asList(handler); Deencapsulation.setField(invocation, "handlerList", handlerChain); @@ -964,7 +962,94 @@ protected void sendResponse(Response response) { restInvocation.doInvoke(); Assert.assertSame(response, result.value); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest()); - Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest()); + assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse()); + } + + @Test + public void scheduleInvocation_invocationContextDeserializeError() { + requestEx = new AbstractHttpServletRequest() { + @Override + public String getHeader(String name) { + return "{\"x-cse-src-microservice\":'source\"}"; + } + }; + Holder<Integer> status = new Holder<>(); + Holder<String> reasonPhrase = new Holder<>(); + Holder<Integer> endCount = new Holder<>(0); + responseEx = new AbstractHttpServletResponse() { + @SuppressWarnings("deprecation") + @Override + public void setStatus(int sc, String sm) { + status.value = sc; + reasonPhrase.value = sm; + } + + @Override + public void flushBuffer() { + endCount.value = endCount.value + 1; + } + + @Override + public void setContentType(String type) { + assertEquals("application/json; charset=utf-8", type); + } + }; + restInvocation.requestEx = requestEx; + restInvocation.responseEx = responseEx; + + restInvocation.scheduleInvocation(); + + assertEquals(Integer.valueOf(590), status.value); + assertEquals("Cse Internal Server Error", reasonPhrase.value); + assertEquals(Integer.valueOf(1), endCount.value); + } + + @SuppressWarnings("deprecation") + @Test + public void scheduleInvocation_flowControlReject() { + new Expectations() { + { + operationMeta.getProviderQpsFlowControlHandler(); + result = (Handler) (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException( + new HttpStatus(429, "Too Many Requests"), + new CommonExceptionData("rejected by qps flowcontrol"))); + } + }; + Holder<Integer> status = new Holder<>(); + Holder<String> reasonPhrase = new Holder<>(); + Holder<Integer> endCount = new Holder<>(0); + Holder<String> responseBody = new Holder<>(); + responseEx = new AbstractHttpServletResponse() { + @SuppressWarnings("deprecation") + @Override + public void setStatus(int sc, String sm) { + status.value = sc; + reasonPhrase.value = sm; + } + + @Override + public void flushBuffer() { + endCount.value = endCount.value + 1; + } + + @Override + public void setContentType(String type) { + assertEquals("application/json; charset=utf-8", type); + } + + @Override + public void setBodyBuffer(Buffer bodyBuffer) { + responseBody.value = bodyBuffer.toString(); + } + }; + setup(); + + restInvocation.scheduleInvocation(); + + assertEquals(Integer.valueOf(429), status.value); + assertEquals("Too Many Requests", reasonPhrase.value); + assertEquals("{\"message\":\"rejected by qps flowcontrol\"}", responseBody.value); + assertEquals(Integer.valueOf(1), endCount.value); } } diff --git a/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java b/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java index 4e3471611..cf908256d 100644 --- a/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java +++ b/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java @@ -18,11 +18,13 @@ package org.apache.servicecomb.core.definition; import java.lang.reflect.Method; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import org.apache.servicecomb.core.Handler; import org.apache.servicecomb.core.executor.ExecutorManager; import org.apache.servicecomb.swagger.invocation.response.ResponseMeta; import org.apache.servicecomb.swagger.invocation.response.ResponsesMeta; @@ -58,6 +60,12 @@ // 为避免每个地方都做复杂的层次管理,直接在这里保存扩展数据 private Map<String, Object> extData = new ConcurrentHashMap<>(); + // providerQpsFlowControlHandler is a temporary filed, only for internal usage + private Handler providerQpsFlowControlHandler; + + // providerQpsFlowControlHandlerSearched is a temporary filed, only for internal usage + private boolean providerQpsFlowControlHandlerSearched; + private String transport = null; public void init(SchemaMeta schemaMeta, Method method, String operationPath, String httpMethod, @@ -158,4 +166,25 @@ public void setExecutor(Executor executor) { public int getParamSize() { return swaggerOperation.getParameters().size(); } + + /** + * Only for JavaChassis internal usage. + */ + @Deprecated + public Handler getProviderQpsFlowControlHandler() { + if (providerQpsFlowControlHandlerSearched) { + return providerQpsFlowControlHandler; + } + + final List<Handler> providerHandlerChain = getSchemaMeta().getProviderHandlerChain(); + for (Handler handler : providerHandlerChain) { + // matching by class name is more or less better than importing an extra maven dependency + if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) { + providerQpsFlowControlHandler = handler; + break; + } + } + providerQpsFlowControlHandlerSearched = true; + return providerQpsFlowControlHandler; + } } diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java index 88ee93682..67ade9411 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java @@ -32,21 +32,25 @@ @Override public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception { - if (!Config.INSTANCE.isProviderEnabled()) { + if (invocation.getHandlerIndex() > 0) { + // handlerIndex > 0, which means this handler is executed in handler chain. + // As this flow control logic has been executed in advance, this time it should be ignored. invocation.next(asyncResp); return; } + // The real executing position of this handler is no longer in handler chain, but in AbstractRestInvocation. + // Therefore, the Invocation#next() method should not be called below. + if (!Config.INSTANCE.isProviderEnabled()) { + return; + } + String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE); QpsController qpsController = StringUtils.isEmpty(microserviceName) ? qpsControllerMgr.getGlobalQpsController() : qpsControllerMgr.getOrCreate(microserviceName, invocation); - if (isLimitNewRequest(qpsController, asyncResp)) { - return; - } - - invocation.next(asyncResp); + isLimitNewRequest(qpsController, asyncResp); } private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) { diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java index 88c80749e..5f294baa8 100644 --- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java +++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java @@ -59,15 +59,13 @@ public boolean isLimitNewRequest() { long msNow = System.currentTimeMillis(); //Time jump cause the new request injected if (msNow - msCycleBegin > CYCLE_LENGTH || msNow < msCycleBegin) { - - //no need worry about concurrency porbleam + //no need worry about concurrency problem lastRequestCount = newCount; msCycleBegin = msNow; } // Configuration update and use is at the situation of multi-threaded concurrency // It is possible that operation level updated to null,but schema level or microservice level does not updated - int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit; return newCount - lastRequestCount >= limitValue; } diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java index f671eeab0..9f7315524 100644 --- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java +++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java @@ -18,6 +18,8 @@ package org.apache.servicecomb.qps; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.times; import org.apache.servicecomb.core.Const; @@ -59,7 +61,6 @@ public void setUP() { ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1); } - @After public void afterTest() { ArchaiusUtils.resetConfig(); @@ -71,6 +72,8 @@ public void testGlobalQpsControl(final @Injectable Invocation invocation, final @Injectable AsyncResponse asyncResp) throws Exception { new Expectations() { { + invocation.getHandlerIndex(); + result = 0; invocation.getContext(Const.SRC_MICROSERVICE); result = "test"; invocation.getOperationMeta(); @@ -95,25 +98,42 @@ public void testGlobalQpsControl(final @Injectable Invocation invocation, gHandler.handle(invocation, asyncResp); } - @Test public void testQpsController() { mockUpSystemTime(); QpsController qpsController = new QpsController("abc", 100); - assertEquals(false, qpsController.isLimitNewRequest()); + assertFalse(qpsController.isLimitNewRequest()); qpsController.setQpsLimit(1); - assertEquals(true, qpsController.isLimitNewRequest()); + assertTrue(qpsController.isLimitNewRequest()); } @Test public void testHandleOnSourceMicroserviceNameIsNull() throws Exception { Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null); + // only when handler index <= 0, the qps logic works + Mockito.when(invocation.getHandlerIndex()).thenReturn(0); + ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1); + ProviderQpsFlowControlHandler.qpsControllerMgr + .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit"); + + handler.handle(invocation, asyncResp); + handler.handle(invocation, asyncResp); + + // Invocation#getContext(String) is only invoked when the qps logic works + Mockito.verify(invocation, times(2)).getContext(Const.SRC_MICROSERVICE); + Mockito.verify(asyncResp, times(1)).producerFail(Mockito.any(Exception.class)); + } + + @Test + public void testHandleOnSourceOnHandlerIndexIsGreaterThan0() throws Exception { + Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null); + Mockito.when(invocation.getHandlerIndex()).thenReturn(1); handler.handle(invocation, asyncResp); handler.handle(invocation, asyncResp); - Mockito.verify(invocation, times(2)).next(asyncResp); + Mockito.verify(invocation, times(0)).getContext(Mockito.anyString()); } @Test @@ -122,25 +142,19 @@ public void testHandle() throws Exception { OperationMeta mockOperationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr"); Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta); Mockito.when(invocation.getSchemaId()).thenReturn("server"); - new MockUp<QpsController>() { - @Mock - public boolean isLimitNewRequest() { - return true; - } - }; new MockUp<QpsControllerManager>() { - @Mock protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 12); + return new QpsController(qualifiedNameKey, 1); } }; + handler.handle(invocation, asyncResp); handler.handle(invocation, asyncResp); ArgumentCaptor<InvocationException> captor = ArgumentCaptor.forClass(InvocationException.class); - Mockito.verify(asyncResp).producerFail(captor.capture()); + Mockito.verify(asyncResp, times(1)).producerFail(captor.capture()); InvocationException invocationException = captor.getValue(); assertEquals(QpsConst.TOO_MANY_REQUESTS_STATUS, invocationException.getStatus()); @@ -155,23 +169,17 @@ public void testHandleIsLimitNewRequestAsFalse() throws Exception { .getMockOperationMeta("pojo", "server", "opr"); Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta); Mockito.when(invocation.getSchemaId()).thenReturn("server"); - new MockUp<QpsController>() { - @Mock - public boolean isLimitNewRequest() { - return false; - } - }; new MockUp<QpsControllerManager>() { - @Mock protected QpsController create(String qualifiedNameKey) { - return new QpsController(qualifiedNameKey, 12); + return new QpsController(qualifiedNameKey, 1); } }; handler.handle(invocation, asyncResp); - Mockito.verify(invocation).next(asyncResp); + Mockito.verify(invocation, times(0)).next(asyncResp); + Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class)); } private void mockUpSystemTime() { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Put provider QPS flow control in front > -------------------------------------- > > Key: SCB-1056 > URL: https://issues.apache.org/jira/browse/SCB-1056 > Project: Apache ServiceComb > Issue Type: Improvement > Components: Java-Chassis > Reporter: YaoHaishi > Assignee: YaoHaishi > Priority: Major > > Currently provider QPS flow control is in ProviderQpsFlowControlHandler which > works in provider handler chain. As a result, the flow control logic takes > effect too late and much CPU resource is wasted on processing those requests > that should be rejected earlier. > Put the provider QPS flow control logic in front can save the resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)