This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch 2.8.x in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 817d48bf610f4eb812bc6632f96aea95de8a7c62 Author: yhs0092 <[email protected]> AuthorDate: Wed Dec 11 19:52:03 2024 +0800 [SCB-2894] optimize code --- .../invocation/ws/SerialExecutorWrapper.java | 6 ++-- .../invocation/ws/SerialExecutorWrapperTest.java | 10 +++--- .../rest/client/ws/WebSocketTransportClient.java | 4 +-- .../rest/vertx/ws/VertxWebSocketAdaptor.java | 42 ++++++++++++++-------- .../vertx/ws/WebSocketHandshakeServerFilter.java | 2 ++ 5 files changed, 39 insertions(+), 25 deletions(-) diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java index 95ef34579..576d0fd98 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java @@ -31,7 +31,7 @@ public class SerialExecutorWrapper implements Executor { private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutorWrapper.class); - private static final int LEAST_QUEUE_CAPACITY = 10; + private static final int LEAST_QUEUE_CAPACITY = 100; private final InvocationType invocationType; @@ -186,7 +186,7 @@ public class SerialExecutorWrapper implements Executor { } private int calculateRealQueueSize() { - return queueCapacity + 50; + return (int) (queueCapacity * 1.2); } private int calculateDrainThreshold() { @@ -194,7 +194,7 @@ public class SerialExecutorWrapper implements Executor { } private int calculateFullThreshold() { - return (int) (queueCapacity * 0.9); + return queueCapacity; } private int correctQueueCapacity(int queueCapacity) { diff --git a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java index 7e9ff171f..768ffc170 100644 --- a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java +++ b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java @@ -50,10 +50,10 @@ public class SerialExecutorWrapperTest { @Test public void executeSingleWrapper() throws InterruptedException { - // queueCapacity(10) + bufferCapacity(50) + taskInWorking(1) = 61, this is the max task count that not trigger queue full exception - final int taskCount = 61; + // queueCapacity(100*1.2) + taskInWorking(1) = 121, this is the max task count that not trigger queue full exception + final int taskCount = 121; final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(InvocationType.PRODUCER, - "testSerialWrapper", workerPools, 10, 3); + "testSerialWrapper", workerPools, 10, 3); // queueCapacity will be corrected to 100 wrapper.subscribeQueueDrainEvent(() -> { }); final Object lock = new Object(); @@ -88,9 +88,9 @@ public class SerialExecutorWrapperTest { @Test public void executeSingleWrapperQueueFull() throws InterruptedException { - final int taskCount = 62; // taskCount +1 than executeSingleWrapper method + final int taskCount = 122; // taskCount +1 than executeSingleWrapper method final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(InvocationType.PRODUCER, - "testSerialWrapper", workerPools, 10, 3); + "testSerialWrapper", workerPools, 10, 3); // queueCapacity will be corrected to 100 wrapper.subscribeQueueDrainEvent(() -> { }); final Object lock = new Object(); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java index c606ab42f..33938c888 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java @@ -39,7 +39,7 @@ public class WebSocketTransportClient { } public void send(Invocation invocation, AsyncResponse asyncResp) { - final WebSocketClientWithContext webSocketClientWithContext = findHttpClientPool(invocation); + final WebSocketClientWithContext webSocketClientWithContext = findWebSocketPool(invocation); final WebSocketClientInvocation webSocketClientInvocation = new WebSocketClientInvocation( webSocketClientWithContext, httpClientFilters); try { @@ -50,7 +50,7 @@ public class WebSocketTransportClient { } } - protected WebSocketClientWithContext findHttpClientPool(Invocation invocation) { + protected WebSocketClientWithContext findWebSocketPool(Invocation invocation) { String clientName = WebSocketTransportClientOptionsSPI.CLIENT_NAME; return HttpClients.getWebSocketClient(clientName, invocation.isSync(), null); } diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java index 195c59fa7..4bb0a6c8a 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java @@ -52,12 +52,18 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private final Executor executor; - private final AbstractBaseWebSocket delegatedWebSocket; + /** + * The WebSocket type provided by user(from business logic) + */ + private final AbstractBaseWebSocket bizWebSocket; + + /** + * The underlying WebSocket type provided by Vert.x which represents the real WebSocket network connection. + */ + private final WebSocketBase vertxWebSocket; private final AtomicBoolean inPauseStatus; - private final WebSocketBase vertxWebSocket; - private final String websocketSessionId; private final InvocationType invocationType; @@ -66,28 +72,34 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { InvocationType invocationType, String websocketSessionId, Executor workerPool, - AbstractBaseWebSocket delegatedWebSocket, + AbstractBaseWebSocket bizWebSocket, WebSocketBase vertxWebSocket) { Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor invocationType is null"); Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor websocketSessionId is null"); Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is null"); - Objects.requireNonNull(delegatedWebSocket, "VertxWebSocketAdaptor delegatedWebSocket is null"); + Objects.requireNonNull(bizWebSocket, "VertxWebSocketAdaptor bizWebSocket is null"); Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor vertxWebSocket is null"); this.invocationType = invocationType; this.websocketSessionId = websocketSessionId; this.executor = workerPool instanceof ReactiveExecutor ? workerPool // for reactive case, no need to wrap it into a serial queue model : prepareSerialExecutorWrapper(workerPool); - this.delegatedWebSocket = delegatedWebSocket; + this.bizWebSocket = bizWebSocket; this.vertxWebSocket = vertxWebSocket; inPauseStatus = new AtomicBoolean(true); vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status keep consistent with inPauseStatus flag - prepare(); - delegatedWebSocket.setWebSocketAdapter(this); + // make sure the bi-direction message stream is established + linkVertxToBiz(); + linkBizToVertx(bizWebSocket); + // notify that the stream connection is ready to work startWorking(); } + private void linkBizToVertx(AbstractBaseWebSocket bizWebSocket) { + bizWebSocket.setWebSocketAdapter(this); + } + private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor workerPool) { final SerialExecutorWrapper wrapper = new SerialExecutorWrapper( invocationType, @@ -106,7 +118,7 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { return wrapper; } - private void prepare() { + private void linkVertxToBiz() { linkVertxDrainHandler(); linkVertxTextMessageHandler(); linkVertxBinaryMessageHandler(); @@ -117,11 +129,11 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private void linkVertxCloseHandler() { vertxWebSocket.closeHandler(v -> - scheduleTask(() -> delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); + scheduleTask(() -> bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); } private void linkVertxExceptionHandler() { - vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> delegatedWebSocket.onError(t))); + vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> bizWebSocket.onError(t))); } private void linkVertxFrameHandler() { @@ -133,24 +145,24 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { vertxWebSocket.binaryMessageHandler(buffer -> { final byte[] bytes = buffer.getBytes(); scheduleTask( - () -> delegatedWebSocket.onMessage(new BinaryBytesWebSocketMessage(bytes))); + () -> bizWebSocket.onMessage(new BinaryBytesWebSocketMessage(bytes))); }); } private void linkVertxTextMessageHandler() { vertxWebSocket.textMessageHandler(s -> scheduleTask( - () -> delegatedWebSocket.onMessage(new TextWebSocketMessage(s)))); + () -> bizWebSocket.onMessage(new TextWebSocketMessage(s)))); } private void linkVertxDrainHandler() { vertxWebSocket.drainHandler(v -> - scheduleTask(delegatedWebSocket::onWriteQueueDrain)); + scheduleTask(bizWebSocket::onWriteQueueDrain)); } private void startWorking() { scheduleTask( - delegatedWebSocket::onConnectionReady); + bizWebSocket::onConnectionReady); } private void scheduleTask(Runnable task) { diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java index 437a0ba23..4abbcf462 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java @@ -80,6 +80,8 @@ public class WebSocketHandshakeServerFilter implements HttpServerFilter { (ServerWebSocket) result, ws)); } + // WebSocket operation may also return an HTTP response, for example, rejecting WebSocket handshake. + // Therefore, we don't throw Exception here, just let it pass and act like REST transport mode. } return HttpServerFilter.super.beforeSendResponseAsync(invocation, responseEx);
