This is an automated email from the ASF dual-hosted git repository.

yaohaishi pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit 817d48bf610f4eb812bc6632f96aea95de8a7c62
Author: yhs0092 <[email protected]>
AuthorDate: Wed Dec 11 19:52:03 2024 +0800

    [SCB-2894] optimize code
---
 .../invocation/ws/SerialExecutorWrapper.java       |  6 ++--
 .../invocation/ws/SerialExecutorWrapperTest.java   | 10 +++---
 .../rest/client/ws/WebSocketTransportClient.java   |  4 +--
 .../rest/vertx/ws/VertxWebSocketAdaptor.java       | 42 ++++++++++++++--------
 .../vertx/ws/WebSocketHandshakeServerFilter.java   |  2 ++
 5 files changed, 39 insertions(+), 25 deletions(-)

diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
index 95ef34579..576d0fd98 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
@@ -31,7 +31,7 @@ public class SerialExecutorWrapper implements Executor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SerialExecutorWrapper.class);
 
-  private static final int LEAST_QUEUE_CAPACITY = 10;
+  private static final int LEAST_QUEUE_CAPACITY = 100;
 
   private final InvocationType invocationType;
 
@@ -186,7 +186,7 @@ public class SerialExecutorWrapper implements Executor {
   }
 
   private int calculateRealQueueSize() {
-    return queueCapacity + 50;
+    return (int) (queueCapacity * 1.2);
   }
 
   private int calculateDrainThreshold() {
@@ -194,7 +194,7 @@ public class SerialExecutorWrapper implements Executor {
   }
 
   private int calculateFullThreshold() {
-    return (int) (queueCapacity * 0.9);
+    return queueCapacity;
   }
 
   private int correctQueueCapacity(int queueCapacity) {
diff --git 
a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
 
b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
index 7e9ff171f..768ffc170 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
@@ -50,10 +50,10 @@ public class SerialExecutorWrapperTest {
 
   @Test
   public void executeSingleWrapper() throws InterruptedException {
-    // queueCapacity(10) + bufferCapacity(50) + taskInWorking(1) = 61, this is 
the max task count that not trigger queue full exception
-    final int taskCount = 61;
+    // queueCapacity(100*1.2) + taskInWorking(1) = 121, this is the max task 
count that not trigger queue full exception
+    final int taskCount = 121;
     final SerialExecutorWrapper wrapper = new 
SerialExecutorWrapper(InvocationType.PRODUCER,
-        "testSerialWrapper", workerPools, 10, 3);
+        "testSerialWrapper", workerPools, 10, 3); // queueCapacity will be 
corrected to 100
     wrapper.subscribeQueueDrainEvent(() -> {
     });
     final Object lock = new Object();
@@ -88,9 +88,9 @@ public class SerialExecutorWrapperTest {
 
   @Test
   public void executeSingleWrapperQueueFull() throws InterruptedException {
-    final int taskCount = 62; // taskCount +1 than executeSingleWrapper method
+    final int taskCount = 122; // taskCount +1 than executeSingleWrapper method
     final SerialExecutorWrapper wrapper = new 
SerialExecutorWrapper(InvocationType.PRODUCER,
-        "testSerialWrapper", workerPools, 10, 3);
+        "testSerialWrapper", workerPools, 10, 3); // queueCapacity will be 
corrected to 100
     wrapper.subscribeQueueDrainEvent(() -> {
     });
     final Object lock = new Object();
diff --git 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
index c606ab42f..33938c888 100644
--- 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
+++ 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
@@ -39,7 +39,7 @@ public class WebSocketTransportClient {
   }
 
   public void send(Invocation invocation, AsyncResponse asyncResp) {
-    final WebSocketClientWithContext webSocketClientWithContext = 
findHttpClientPool(invocation);
+    final WebSocketClientWithContext webSocketClientWithContext = 
findWebSocketPool(invocation);
     final WebSocketClientInvocation webSocketClientInvocation = new 
WebSocketClientInvocation(
         webSocketClientWithContext, httpClientFilters);
     try {
@@ -50,7 +50,7 @@ public class WebSocketTransportClient {
     }
   }
 
-  protected WebSocketClientWithContext findHttpClientPool(Invocation 
invocation) {
+  protected WebSocketClientWithContext findWebSocketPool(Invocation 
invocation) {
     String clientName = WebSocketTransportClientOptionsSPI.CLIENT_NAME;
     return HttpClients.getWebSocketClient(clientName, invocation.isSync(), 
null);
   }
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
index 195c59fa7..4bb0a6c8a 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
@@ -52,12 +52,18 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private final Executor executor;
 
-  private final AbstractBaseWebSocket delegatedWebSocket;
+  /**
+   * The WebSocket type provided by user(from business logic)
+   */
+  private final AbstractBaseWebSocket bizWebSocket;
+
+  /**
+   * The underlying WebSocket type provided by Vert.x which represents the 
real WebSocket network connection.
+   */
+  private final WebSocketBase vertxWebSocket;
 
   private final AtomicBoolean inPauseStatus;
 
-  private final WebSocketBase vertxWebSocket;
-
   private final String websocketSessionId;
 
   private final InvocationType invocationType;
@@ -66,28 +72,34 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
       InvocationType invocationType,
       String websocketSessionId,
       Executor workerPool,
-      AbstractBaseWebSocket delegatedWebSocket,
+      AbstractBaseWebSocket bizWebSocket,
       WebSocketBase vertxWebSocket) {
     Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor 
invocationType is null");
     Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor 
websocketSessionId is null");
     Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is 
null");
-    Objects.requireNonNull(delegatedWebSocket, "VertxWebSocketAdaptor 
delegatedWebSocket is null");
+    Objects.requireNonNull(bizWebSocket, "VertxWebSocketAdaptor bizWebSocket 
is null");
     Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor 
vertxWebSocket is null");
     this.invocationType = invocationType;
     this.websocketSessionId = websocketSessionId;
     this.executor = workerPool instanceof ReactiveExecutor ?
         workerPool // for reactive case, no need to wrap it into a serial 
queue model
         : prepareSerialExecutorWrapper(workerPool);
-    this.delegatedWebSocket = delegatedWebSocket;
+    this.bizWebSocket = bizWebSocket;
     this.vertxWebSocket = vertxWebSocket;
     inPauseStatus = new AtomicBoolean(true);
     vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status 
keep consistent with inPauseStatus flag
 
-    prepare();
-    delegatedWebSocket.setWebSocketAdapter(this);
+    // make sure the bi-direction message stream is established
+    linkVertxToBiz();
+    linkBizToVertx(bizWebSocket);
+    // notify that the stream connection is ready to work
     startWorking();
   }
 
+  private void linkBizToVertx(AbstractBaseWebSocket bizWebSocket) {
+    bizWebSocket.setWebSocketAdapter(this);
+  }
+
   private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor 
workerPool) {
     final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(
         invocationType,
@@ -106,7 +118,7 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
     return wrapper;
   }
 
-  private void prepare() {
+  private void linkVertxToBiz() {
     linkVertxDrainHandler();
     linkVertxTextMessageHandler();
     linkVertxBinaryMessageHandler();
@@ -117,11 +129,11 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private void linkVertxCloseHandler() {
     vertxWebSocket.closeHandler(v ->
-        scheduleTask(() -> 
delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
+        scheduleTask(() -> 
bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
   }
 
   private void linkVertxExceptionHandler() {
-    vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> 
delegatedWebSocket.onError(t)));
+    vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> 
bizWebSocket.onError(t)));
   }
 
   private void linkVertxFrameHandler() {
@@ -133,24 +145,24 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
     vertxWebSocket.binaryMessageHandler(buffer -> {
       final byte[] bytes = buffer.getBytes();
       scheduleTask(
-          () -> delegatedWebSocket.onMessage(new 
BinaryBytesWebSocketMessage(bytes)));
+          () -> bizWebSocket.onMessage(new 
BinaryBytesWebSocketMessage(bytes)));
     });
   }
 
   private void linkVertxTextMessageHandler() {
     vertxWebSocket.textMessageHandler(s ->
         scheduleTask(
-            () -> delegatedWebSocket.onMessage(new TextWebSocketMessage(s))));
+            () -> bizWebSocket.onMessage(new TextWebSocketMessage(s))));
   }
 
   private void linkVertxDrainHandler() {
     vertxWebSocket.drainHandler(v ->
-        scheduleTask(delegatedWebSocket::onWriteQueueDrain));
+        scheduleTask(bizWebSocket::onWriteQueueDrain));
   }
 
   private void startWorking() {
     scheduleTask(
-        delegatedWebSocket::onConnectionReady);
+        bizWebSocket::onConnectionReady);
   }
 
   private void scheduleTask(Runnable task) {
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
index 437a0ba23..4abbcf462 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
@@ -80,6 +80,8 @@ public class WebSocketHandshakeServerFilter implements 
HttpServerFilter {
                     (ServerWebSocket) result,
                     ws));
       }
+      // WebSocket operation may also return an HTTP response, for example, 
rejecting WebSocket handshake.
+      // Therefore, we don't throw Exception here, just let it pass and act 
like REST transport mode.
     }
 
     return HttpServerFilter.super.beforeSendResponseAsync(invocation, 
responseEx);

Reply via email to