This is an automated email from the ASF dual-hosted git repository.

yaohaishi pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit bb8c6fd88d948e5044523c7b3979a091ff8c5f85
Author: yhs0092 <[email protected]>
AuthorDate: Mon Dec 9 21:29:20 2024 +0800

    [SCB-2894] support WebSocket message handling logic run on the Executors 
serially
    
    and fix EdgeService may be hanged in WebSocket handshake procedure
    because it's too late to pause http handshake request.
---
 .../common/rest/AbstractRestInvocation.java        |  10 -
 .../rest/filter/inner/ClientRestArgsFilter.java    |   2 +-
 .../vertx/client/http/HttpClientOptionsSPI.java    |   1 -
 .../foundation/vertx/client/http/HttpClients.java  |   3 +-
 .../vertx/client/ws/WebSocketClientOptionsSPI.java |   2 +-
 .../invocation/ws/AbstractBaseWebSocket.java       |  13 +-
 .../invocation/ws/SerialExecutorWrapper.java       | 223 +++++++++++++++++++++
 .../swagger/invocation/ws/WebSocket.java           |   2 +-
 .../swagger/invocation/ws/WebSocketFrame.java      |   2 -
 .../swagger/invocation/ws/WebSocketPipe.java       |  68 ++++---
 .../invocation/ws/SerialExecutorWrapperTest.java   | 132 ++++++++++++
 .../rest/client/ws/WebSocketClientInvocation.java  |   2 +-
 .../rest/client/ws/WebSocketTransportClient.java   |   3 +-
 .../transport/rest/vertx/RestServerVerticle.java   |  15 ++
 .../transport/rest/vertx/TransportConfig.java      |   7 +
 .../rest/vertx/ws/VertxWebSocketAdaptor.java       | 100 +++++++--
 .../vertx/ws/WebSocketHandshakeServerFilter.java   |   7 +-
 .../ws/WebSocketResponseWrapClientFilter.java      |   2 +
 18 files changed, 527 insertions(+), 67 deletions(-)

diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index b65fb519f..a9431a990 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -46,7 +46,6 @@ import org.apache.servicecomb.foundation.common.Holder;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import 
org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.swagger.invocation.ws.ServerWebSocket;
@@ -187,15 +186,6 @@ public abstract class AbstractRestInvocation {
       return;
     }
 
-    if (Const.WEBSOCKET.equals(invocation.getProviderTransportName())) {
-      // pause for WebSocket handshake, waiting for completion of the biz REST 
operation
-      if (requestEx instanceof VertxServerRequestToHttpServletRequest) {
-        ((VertxServerRequestToHttpServletRequest) requestEx).getContext()
-            .request()
-            .pause();
-      }
-    }
-
     try {
       operationMeta.getExecutor().execute(() -> {
         synchronized (this.requestEx) {
diff --git 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java
 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java
index c06006214..19652e2db 100644
--- 
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java
+++ 
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java
@@ -40,7 +40,7 @@ public class ClientRestArgsFilter implements HttpClientFilter 
{
 
   @Override
   public boolean enabledForTransport(String transport) {
-    return HttpClientFilter.super.enabledForTransport(transport)|| 
Const.WEBSOCKET.equals(transport);
+    return HttpClientFilter.super.enabledForTransport(transport) || 
Const.WEBSOCKET.equals(transport);
   }
 
   @Override
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java
index 03d48fe2c..0e684e7c3 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java
@@ -24,7 +24,6 @@ import com.netflix.config.ConcurrentCompositeConfiguration;
 
 import io.vertx.core.http.HttpClientOptions;
 import io.vertx.core.http.HttpVersion;
-import io.vertx.core.http.WebSocketClientOptions;
 import io.vertx.core.net.ClientOptionsBase;
 import io.vertx.core.net.ProxyOptions;
 
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java
index bc982d264..4449a3d02 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java
@@ -38,7 +38,6 @@ import io.vertx.core.DeploymentOptions;
 import io.vertx.core.Vertx;
 import io.vertx.core.VertxOptions;
 import io.vertx.core.dns.AddressResolverOptions;
-import io.vertx.core.http.WebSocketClient;
 
 /**
  *  load and manages a set of HttpClient at boot up.
@@ -83,7 +82,7 @@ public class HttpClients {
     clientOptionsList.forEach(option -> 
VertxUtils.blockCloseVertxByName(option.clientName()));
 
     wsClients.clear();
-    List<WebSocketClientOptionsSPI> websocketClientOptionsList =
+    final List<WebSocketClientOptionsSPI> websocketClientOptionsList =
         
SPIServiceUtils.getOrLoadSortedService(WebSocketClientOptionsSPI.class);
     websocketClientOptionsList.forEach(option -> 
VertxUtils.blockCloseVertxByName(option.clientName()));
   }
diff --git 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java
 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java
index 95463a3b5..46b1d7bc1 100644
--- 
a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java
+++ 
b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java
@@ -24,7 +24,7 @@ import io.vertx.core.http.HttpVersion;
 import io.vertx.core.http.WebSocketClientOptions;
 
 /**
- * WebSocket client options must be set by implementation.
+ * WebSocket client options must be set by implementations.
  */
 public abstract class WebSocketClientOptionsSPI implements 
HttpClientOptionsSPI {
   public static WebSocketClientOptions 
createWebSocketClientOptions(WebSocketClientOptionsSPI spi) {
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java
index 2400d833e..704c71e8b 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java
@@ -55,13 +55,24 @@ public abstract class AbstractBaseWebSocket implements 
WebSocket {
     webSocketAdapter.resume();
   }
 
+  /**
+   * Check whether the message sending queue is full, in which case you should 
pause your sending action.
+   * To get to know when to recover sending action, please override the {@link 
#onWriteQueueDrain()} method
+   * to subscribe the notification that the message sending queue is ready to 
accept message again.
+   *
+   * @return true if message sending queue is full.
+   */
   @Override
   public boolean writeQueueFull() {
     return webSocketAdapter.writeQueueFull();
   }
 
+  /**
+   * The callback to notify when the message sending queue is ready to accept 
sending message/frame again.
+   * Usually this method is used in conjunction with the {@link 
#writeQueueFull()}.
+   */
   @Override
-  public void onDrain() {
+  public void onWriteQueueDrain() {
   }
 
   public void setWebSocketAdapter(WebSocketAdapter webSocketAdapter) {
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
new file mode 100644
index 000000000..95ef34579
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.invocation.ws;
+
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerialExecutorWrapper implements Executor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SerialExecutorWrapper.class);
+
+  private static final int LEAST_QUEUE_CAPACITY = 10;
+
+  private final InvocationType invocationType;
+
+  private final String id;
+
+  private final Queue<Runnable> queue;
+
+  private final Executor workerPool;
+
+  private final int queueCapacity;
+
+  private final int drainThreshold;
+
+  private final int fullThreshold;
+
+  private final AtomicBoolean scheduleFlag;
+
+  private final int maxContinueTimes;
+
+  private QueueDrainSubscriber queueDrainSubscriber;
+
+  private QueueFullSubscriber queueFullSubscriber;
+
+  public SerialExecutorWrapper(
+      InvocationType invocationType,
+      String id, Executor workerPool, int queueCapacity, int maxContinueTimes) 
{
+    this.invocationType = invocationType;
+    this.id = id;
+    this.workerPool = workerPool;
+    this.queueCapacity = correctQueueCapacity(queueCapacity);
+    queue = new ArrayBlockingQueue<>(calculateRealQueueSize(), true);
+    drainThreshold = calculateDrainThreshold();
+    fullThreshold = calculateFullThreshold();
+    scheduleFlag = new AtomicBoolean();
+    this.maxContinueTimes = correctMaxContinueTimes(maxContinueTimes);
+  }
+
+  /**
+   * Subscribe queue drain event to resume message queue.
+   * Use this method in cooperation with {@link 
#subscribeQueueFullEvent(QueueFullSubscriber)} method.
+   */
+  public void subscribeQueueDrainEvent(QueueDrainSubscriber subscriber) {
+    if (subscriber != null) {
+      queueDrainSubscriber = subscriber;
+    }
+  }
+
+  /**
+   * Subscribe queue full event to pause message queue.
+   * Use this method in cooperation with {@link 
#subscribeQueueDrainEvent(QueueDrainSubscriber)} method.
+   */
+  public void subscribeQueueFullEvent(QueueFullSubscriber subscriber) {
+    if (subscriber != null) {
+      queueFullSubscriber = subscriber;
+    }
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    Objects.requireNonNull(command, "command must not be null");
+    Objects.requireNonNull(queueDrainSubscriber, "queueDrainSubscriber must 
not be null");
+    queue.add(command);
+    markQueueFull();
+    scheduleWorker();
+  }
+
+  private void scheduleWorker() {
+    if (!scheduleFlag.compareAndSet(false, true)) {
+      // a running task has been set, don't repeat
+      return;
+    }
+    try {
+      workerPool.execute(() -> {
+        try {
+          runTasks();
+        } finally {
+          scheduleFlag.set(false);
+          if (!queue.isEmpty()) {
+            scheduleWorker();
+          }
+        }
+      });
+    } catch (Throwable e) {
+      // in case that the underlying executor queue full, the scheduleFlag 
should be recovered
+      scheduleFlag.set(false);
+      LOGGER.error("[{}]-[{}] failed to execute task in actual thread pool!", 
invocationType, id, e);
+    }
+  }
+
+  private void runTasks() {
+    int workCount = 1;
+    while (true) {
+      final Runnable task = queue.poll();
+      if (task == null) {
+        break;
+      }
+
+      try {
+        task.run();
+      } catch (Throwable e) {
+        LOGGER.error("[{}]-[{}] error occurred while executing task[{}]", 
invocationType, id, task, e);
+      }
+
+      ++workCount;
+      if (workCount > maxContinueTimes) {
+        break;
+      }
+    }
+
+    notifyIfQueueDrain();
+  }
+
+  private void markQueueFull() {
+    final QueueFullSubscriber subscriber = queueFullSubscriber;
+    if (subscriber == null) {
+      return;
+    }
+
+    if ((queue.size() < fullThreshold)) {
+      return;
+    }
+
+    LOGGER.warn("[{}]-[{}] queue nearly full! queue length: {}/{}",
+        invocationType, id, queue.size(), calculateRealQueueSize());
+    try {
+      subscriber.run();
+    } catch (Throwable e) {
+      LOGGER.error("[{}]-[{}] error occurred while notifying queue full 
subscriber[{}]", invocationType, id,
+          subscriber, e);
+    }
+    if (queue.size() < drainThreshold) {
+      // in case that all tasks has been completed and WebSocket is paused,
+      // and no task to trigger the WebSocket get resumed
+      execute(this::notifyIfQueueDrain);
+    }
+  }
+
+  private void notifyIfQueueDrain() {
+    final QueueDrainSubscriber subscriber = queueDrainSubscriber;
+    if (subscriber == null) {
+      return;
+    }
+    if (queue.size() > drainThreshold) {
+      return;
+    }
+    try {
+      subscriber.run();
+    } catch (Throwable e) {
+      LOGGER.error("[{}]-[{}] error occurred while notifying queue drain 
subscriber[{}]", invocationType, id,
+          subscriber, e);
+    }
+  }
+
+  private int calculateRealQueueSize() {
+    return queueCapacity + 50;
+  }
+
+  private int calculateDrainThreshold() {
+    return (int) (queueCapacity * 0.25);
+  }
+
+  private int calculateFullThreshold() {
+    return (int) (queueCapacity * 0.9);
+  }
+
+  private int correctQueueCapacity(int queueCapacity) {
+    if (queueCapacity < LEAST_QUEUE_CAPACITY) {
+      LOGGER.warn("queue capacity less than 10 does not make sense, adjust to 
{}", LEAST_QUEUE_CAPACITY);
+      return LEAST_QUEUE_CAPACITY;
+    }
+    return queueCapacity;
+  }
+
+  private int correctMaxContinueTimes(int maxContinueTimes) {
+    if (maxContinueTimes < 1) {
+      LOGGER.warn("maxContinueTimes less than 1 does not make sense, adjust to 
1");
+      return 1;
+    }
+    return maxContinueTimes;
+  }
+
+  public interface QueueDrainSubscriber {
+    void run();
+  }
+
+  public interface QueueFullSubscriber {
+    void run();
+  }
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java
index cde202c1d..7cfb666ff 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java
@@ -55,5 +55,5 @@ public interface WebSocket {
 
   boolean writeQueueFull();
 
-  void onDrain();
+  void onWriteQueueDrain();
 }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java
index e2ca12b52..7b69c5854 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java
@@ -22,6 +22,4 @@ package org.apache.servicecomb.swagger.invocation.ws;
  * There should be such a concept, but not need to supply such function for 
now.
  */
 public class WebSocketFrame {
-  private boolean isFinal;
-  private WebSocketFrameType type;
 }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
index c24a96db0..76da45cdf 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
@@ -30,13 +30,20 @@ import 
io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
 public class WebSocketPipe {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketPipe.class);
 
+  private final String websocketSessionId;
+
   private final PipeServerWebSocket serverWebSocket;
 
   private final PipeClientWebSocket clientWebSocket;
 
+  /**
+   * There is chance that the server and client try to close connection at the 
same time.
+   * Use one lock to avoid the client and server websocket deadlock in closing 
procedure.
+   */
   private final Object statusLock = new Object();
 
-  public WebSocketPipe() {
+  public WebSocketPipe(String websocketSessionId) {
+    this.websocketSessionId = websocketSessionId;
     this.clientWebSocket = new PipeClientWebSocket();
     this.serverWebSocket = new PipeServerWebSocket();
     this.clientWebSocket.connect(serverWebSocket);
@@ -97,32 +104,33 @@ public class WebSocketPipe {
       peer.sendMessage(message)
           .whenComplete((v, t) -> {
             if (t != null) {
-              LOGGER.error("failed to forward message", t);
+              LOGGER.error("[{}] failed to forward message", 
websocketSessionId, t);
             }
           });
       if (peer.writeQueueFull()) {
         this.pause();
-        LOGGER.debug("pipe paused, direction is server to client");
+        LOGGER.debug("[{}] pipe paused, direction is server to client", 
websocketSessionId);
       }
     }
 
     @Override
     public void onError(Throwable t) {
-      LOGGER.error("websocket error", t);
+      LOGGER.error("[{}] websocket error", websocketSessionId, t);
       synchronized (statusLock) {
         transferStatus(PipeWebSocketStatus.ERROR);
-        safelyClose();
+        safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
+            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
       }
     }
 
     @Override
     public void onClose(Short closeStatusCode, String closeReason) {
       transferStatus(PipeWebSocketStatus.CLOSED);
-      safelyClose(); // should close peer
+      safelyClose(closeStatusCode, closeReason); // should close peer
     }
 
     @Override
-    public void onDrain() {
+    public void onWriteQueueDrain() {
       peer.resume();
     }
 
@@ -132,6 +140,7 @@ public class WebSocketPipe {
         if (status == PipeWebSocketStatus.CLOSING || status == 
PipeWebSocketStatus.CLOSED) {
           return CompletableFuture.completedFuture(null);
         }
+
         transferStatus(PipeWebSocketStatus.CLOSING);
         if (status.ordinal() < PipeWebSocketStatus.RUNNING.ordinal()) {
           transferStatus(PipeWebSocketStatus.CLOSED);
@@ -151,7 +160,9 @@ public class WebSocketPipe {
     private void transferStatus(PipeWebSocketStatus expectedOldStatus, 
PipeWebSocketStatus newStatus) {
       synchronized (statusLock) {
         if (status != expectedOldStatus) {
-          safelyClose();
+          safelyClose((short) 
WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
+              WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+          LOGGER.error("[{}] illegal state transfer: from {} to {}", 
websocketSessionId, expectedOldStatus, newStatus);
           throw new IllegalStateException("Illegal state transfer: ["
               + expectedOldStatus
               + "] to ["
@@ -162,21 +173,19 @@ public class WebSocketPipe {
       }
     }
 
-    private void safelyClose() {
+    private void safelyClose(Short closeStatusCode, String closeReason) {
       try {
-        close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
-            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+        close(closeStatusCode, closeReason);
       } catch (Throwable e) {
-        LOGGER.error("failed to close pipe server websocket", e);
+        LOGGER.error("[{}] failed to close pipe server websocket", 
websocketSessionId, e);
       }
       if (peer == null) {
         return;
       }
       try {
-        peer.close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
-            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+        peer.close(closeStatusCode, closeReason);
       } catch (Throwable e) {
-        LOGGER.error("failed to close pipe client websocket", e);
+        LOGGER.error("[{}] failed to close pipe client websocket", 
websocketSessionId, e);
       }
     }
   }
@@ -215,32 +224,33 @@ public class WebSocketPipe {
       peer.sendMessage(message)
           .whenComplete((v, t) -> {
             if (t != null) {
-              LOGGER.error("failed to forward message", t);
+              LOGGER.error("[{}] failed to forward message", 
websocketSessionId, t);
             }
           });
       if (peer.writeQueueFull()) {
         this.pause();
-        LOGGER.debug("pipe paused, direction is client to server");
+        LOGGER.debug("[{}] pipe paused, direction is client to server", 
websocketSessionId);
       }
     }
 
     @Override
     public void onError(Throwable t) {
-      LOGGER.error("websocket error", t);
+      LOGGER.error("[{}] websocket error", websocketSessionId, t);
       synchronized (statusLock) {
         transferStatus(PipeWebSocketStatus.ERROR);
-        safelyClose();
+        safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
+            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
       }
     }
 
     @Override
     public void onClose(Short closeStatusCode, String closeReason) {
       transferStatus(PipeWebSocketStatus.CLOSED);
-      safelyClose(); // should close peer
+      safelyClose(closeStatusCode, closeReason); // should close peer
     }
 
     @Override
-    public void onDrain() {
+    public void onWriteQueueDrain() {
       peer.resume();
     }
 
@@ -253,7 +263,9 @@ public class WebSocketPipe {
     private void transferStatus(PipeWebSocketStatus expectedOldStatus, 
PipeWebSocketStatus newStatus) {
       synchronized (statusLock) {
         if (status != expectedOldStatus) {
-          safelyClose();
+          safelyClose((short) 
WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
+              WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+          LOGGER.error("[{}] illegal state transfer: from {} to {}", 
websocketSessionId, expectedOldStatus, newStatus);
           throw new IllegalStateException("Illegal state transfer: ["
               + expectedOldStatus
               + "] to ["
@@ -264,21 +276,19 @@ public class WebSocketPipe {
       }
     }
 
-    private void safelyClose() {
+    private void safelyClose(Short closeStatusCode, String closeReason) {
       try {
-        close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
-            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+        close(closeStatusCode, closeReason);
       } catch (Throwable e) {
-        LOGGER.error("failed to close pipe client websocket", e);
+        LOGGER.error("[{}] failed to close pipe client websocket", 
websocketSessionId, e);
       }
       if (peer == null) {
         return;
       }
       try {
-        peer.close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(),
-            WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText());
+        peer.close(closeStatusCode, closeReason);
       } catch (Throwable e) {
-        LOGGER.error("failed to close pipe server websocket", e);
+        LOGGER.error("[{}] failed to close pipe server websocket", 
websocketSessionId, e);
       }
     }
   }
diff --git 
a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
 
b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
new file mode 100644
index 000000000..7e9ff171f
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.invocation.ws;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * SerialExecutorWrapper UT
+ */
+public class SerialExecutorWrapperTest {
+  private ExecutorService workerPools;
+
+  @Before
+  public void before() {
+    workerPools = Executors.newFixedThreadPool(20);
+  }
+
+  @After
+  public void after() {
+    workerPools.shutdown();
+  }
+
+  @Test
+  public void executeSingleWrapper() throws InterruptedException {
+    // queueCapacity(10) + bufferCapacity(50) + taskInWorking(1) = 61, this is 
the max task count that not trigger queue full exception
+    final int taskCount = 61;
+    final SerialExecutorWrapper wrapper = new 
SerialExecutorWrapper(InvocationType.PRODUCER,
+        "testSerialWrapper", workerPools, 10, 3);
+    wrapper.subscribeQueueDrainEvent(() -> {
+    });
+    final Object lock = new Object();
+    final List<Integer> resultList = new ArrayList<>();
+    final CountDownLatch countDownLatch = new CountDownLatch(taskCount);
+    synchronized (lock) {
+      final CountDownLatch continueLoopLatch = new CountDownLatch(1);
+      wrapper.execute(() -> {
+        continueLoopLatch.countDown();
+        synchronized (lock) {
+          resultList.add(0);
+          countDownLatch.countDown();
+        }
+      });
+      continueLoopLatch.await(1, TimeUnit.MINUTES); // wait the first task 
start running to ensure a stable result.
+      for (int i = 1; i < taskCount; ++i) {
+        final int index = i;
+        wrapper.execute(() -> {
+          resultList.add(index);
+          countDownLatch.countDown();
+        });
+      }
+      MatcherAssert.assertThat(resultList, Matchers.empty());
+    }
+    countDownLatch.await(1, TimeUnit.MINUTES);
+    MatcherAssert.assertThat(resultList, Matchers.hasSize(taskCount));
+    for (int i = 0; i < taskCount; ++i) {
+      // must execute
+      MatcherAssert.assertThat(resultList.get(i), Matchers.equalTo(i));
+    }
+  }
+
+  @Test
+  public void executeSingleWrapperQueueFull() throws InterruptedException {
+    final int taskCount = 62; // taskCount +1 than executeSingleWrapper method
+    final SerialExecutorWrapper wrapper = new 
SerialExecutorWrapper(InvocationType.PRODUCER,
+        "testSerialWrapper", workerPools, 10, 3);
+    wrapper.subscribeQueueDrainEvent(() -> {
+    });
+    final Object lock = new Object();
+    final AtomicBoolean exceptionFlag = new AtomicBoolean();
+    final List<Integer> resultList = new ArrayList<>();
+    final CountDownLatch countDownLatch = new CountDownLatch(taskCount - 1);
+    synchronized (lock) {
+      final CountDownLatch continueLoopLatch = new CountDownLatch(1);
+      wrapper.execute(() -> {
+        continueLoopLatch.countDown();
+        synchronized (lock) {
+          resultList.add(0);
+          countDownLatch.countDown();
+        }
+      });
+      continueLoopLatch.await(1, TimeUnit.MINUTES); // wait the first task 
start running to ensure a stable result.
+      for (int i = 1; i < taskCount; ++i) {
+        final int index = i;
+        try {
+          wrapper.execute(() -> {
+            resultList.add(index);
+            countDownLatch.countDown();
+          });
+        } catch (Exception e) {
+          exceptionFlag.set(true);
+          MatcherAssert.assertThat(i, Matchers.equalTo(taskCount - 1));
+          MatcherAssert.assertThat(e, 
Matchers.instanceOf(IllegalStateException.class));
+        }
+      }
+      MatcherAssert.assertThat(resultList, Matchers.empty());
+    }
+    countDownLatch.await(1, TimeUnit.MINUTES);
+    MatcherAssert.assertThat(exceptionFlag.get(), Matchers.equalTo(true));
+    MatcherAssert.assertThat(resultList, Matchers.hasSize(taskCount - 1));
+    for (int i = 0; i < taskCount - 1; ++i) {
+      MatcherAssert.assertThat(resultList.get(i), Matchers.equalTo(i));
+    }
+  }
+}
diff --git 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java
 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java
index 9bac6305e..375110b84 100644
--- 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java
+++ 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java
@@ -56,7 +56,7 @@ import io.vertx.core.http.WebSocketConnectOptions;
 
 public class WebSocketClientInvocation {
 
-  public static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketClientInvocation.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketClientInvocation.class);
 
   private static final String[] INTERNAL_HEADERS = new String[] {
       Const.CSE_CONTEXT,
diff --git 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
index a31d9a1d3..c606ab42f 100644
--- 
a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
+++ 
b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java
@@ -41,8 +41,7 @@ public class WebSocketTransportClient {
   public void send(Invocation invocation, AsyncResponse asyncResp) {
     final WebSocketClientWithContext webSocketClientWithContext = 
findHttpClientPool(invocation);
     final WebSocketClientInvocation webSocketClientInvocation = new 
WebSocketClientInvocation(
-        webSocketClientWithContext,
-        httpClientFilters);
+        webSocketClientWithContext, httpClientFilters);
     try {
       webSocketClientInvocation.invoke(invocation, asyncResp);
     } catch (Throwable e) {
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
index 59cb6cd37..9edf23126 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java
@@ -57,6 +57,7 @@ import io.vertx.core.http.Http2Settings;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
 import io.vertx.core.net.impl.ConnectionBase;
 import io.vertx.ext.web.Router;
@@ -88,6 +89,7 @@ public class RestServerVerticle extends AbstractVerticle {
         return;
       }
       Router mainRouter = Router.router(vertx);
+      mountWebSocketPauseHandler(mainRouter);
       mountAccessLogHandler(mainRouter);
       mountCorsHandler(mainRouter);
       initDispatcher(mainRouter);
@@ -163,6 +165,18 @@ public class RestServerVerticle extends AbstractVerticle {
         .failureHandler(failureHandler);
   }
 
+  private void mountWebSocketPauseHandler(Router mainRouter) {
+    mainRouter.route().handler(context -> {
+      final HttpServerRequest request = context.request();
+      if (request.headers().contains(
+          io.vertx.core.http.HttpHeaders.UPGRADE, 
io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
+        // pause for websocket, to avoid missing end event, which may cause 
vert.x HttpServerRequest toWebSocket never complete
+        request.pause();
+      }
+      context.next();
+    });
+  }
+
   private void mountAccessLogHandler(Router mainRouter) {
     if (!AccessLogConfig.INSTANCE.isServerLogEnabled()) {
       return;
@@ -267,6 +281,7 @@ public class RestServerVerticle extends AbstractVerticle {
     
serverOptions.setDecompressionSupported(TransportConfig.getDecompressionSupported());
     
serverOptions.setDecoderInitialBufferSize(TransportConfig.getDecoderInitialBufferSize());
     
serverOptions.setMaxInitialLineLength(TransportConfig.getMaxInitialLineLength());
+    serverOptions.setLogActivity(TransportConfig.enableLogActivity());
 
     // WebSocket config start
     
serverOptions.setMaxWebSocketFrameSize(TransportConfig.getMaxWebSocketFrameSize());
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
index dd922fbc2..b52505168 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
@@ -182,6 +182,13 @@ public final class TransportConfig {
         .get();
   }
 
+  public static boolean enableLogActivity() {
+    return DynamicPropertyFactory.getInstance()
+        .getBooleanProperty("servicecomb.rest.server.enableLogActivity",
+            false)
+        .get();
+  }
+
   public static boolean isCorsEnabled() {
     return DynamicPropertyFactory.getInstance()
         .getBooleanProperty(SERVICECOMB_CORS_CONFIG_BASE + ".enabled", false)
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
index 696db5af7..195c59fa7 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
@@ -20,13 +20,21 @@ package org.apache.servicecomb.transport.rest.vertx.ws;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.servicecomb.core.executor.ReactiveExecutor;
+import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.ws.AbstractBaseWebSocket;
 import 
org.apache.servicecomb.swagger.invocation.ws.BinaryBytesWebSocketMessage;
+import org.apache.servicecomb.swagger.invocation.ws.SerialExecutorWrapper;
 import org.apache.servicecomb.swagger.invocation.ws.TextWebSocketMessage;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketAdapter;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketFrame;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.config.DynamicPropertyFactory;
 
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.WebSocketBase;
@@ -35,27 +43,70 @@ import io.vertx.core.http.WebSocketBase;
  * VertxWebSocketAdaptor
  */
 public class VertxWebSocketAdaptor implements WebSocketAdapter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VertxWebSocketAdaptor.class);
+
+  private static final int DEFAULT_ADAPTER_QUEUE_MAX_SIZE = 100;
+
+  private static final int DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES = 10;
+
   private final Executor executor;
 
   private final AbstractBaseWebSocket delegatedWebSocket;
 
+  private final AtomicBoolean inPauseStatus;
+
   private final WebSocketBase vertxWebSocket;
 
-  public VertxWebSocketAdaptor(Executor executor, AbstractBaseWebSocket 
delegatedWebSocket,
+  private final String websocketSessionId;
+
+  private final InvocationType invocationType;
+
+  public VertxWebSocketAdaptor(
+      InvocationType invocationType,
+      String websocketSessionId,
+      Executor workerPool,
+      AbstractBaseWebSocket delegatedWebSocket,
       WebSocketBase vertxWebSocket) {
-    Objects.requireNonNull(executor, "VertxWebSocketAdaptor executor is null");
+    Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor 
invocationType is null");
+    Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor 
websocketSessionId is null");
+    Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is 
null");
     Objects.requireNonNull(delegatedWebSocket, "VertxWebSocketAdaptor 
delegatedWebSocket is null");
     Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor 
vertxWebSocket is null");
-    this.executor = executor;
+    this.invocationType = invocationType;
+    this.websocketSessionId = websocketSessionId;
+    this.executor = workerPool instanceof ReactiveExecutor ?
+        workerPool // for reactive case, no need to wrap it into a serial 
queue model
+        : prepareSerialExecutorWrapper(workerPool);
     this.delegatedWebSocket = delegatedWebSocket;
     this.vertxWebSocket = vertxWebSocket;
+    inPauseStatus = new AtomicBoolean(true);
+    vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status 
keep consistent with inPauseStatus flag
 
-    link();
+    prepare();
     delegatedWebSocket.setWebSocketAdapter(this);
     startWorking();
   }
 
-  private void link() {
+  private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor 
workerPool) {
+    final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(
+        invocationType,
+        websocketSessionId,
+        workerPool,
+        DynamicPropertyFactory.getInstance()
+            .getIntProperty("servicecomb.websocket.adapter.queue.maxSize",
+                DEFAULT_ADAPTER_QUEUE_MAX_SIZE)
+            .get(),
+        DynamicPropertyFactory.getInstance()
+            
.getIntProperty("servicecomb.websocket.adapter.queue.maxContinueTimes",
+                DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES)
+            .get());
+    wrapper.subscribeQueueDrainEvent(this::resume);
+    wrapper.subscribeQueueFullEvent(this::pause);
+    return wrapper;
+  }
+
+  private void prepare() {
     linkVertxDrainHandler();
     linkVertxTextMessageHandler();
     linkVertxBinaryMessageHandler();
@@ -66,12 +117,11 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private void linkVertxCloseHandler() {
     vertxWebSocket.closeHandler(v ->
-        executor.execute(
-            () -> delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
+        scheduleTask(() -> 
delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
   }
 
   private void linkVertxExceptionHandler() {
-    vertxWebSocket.exceptionHandler(t -> executor.execute(() -> 
delegatedWebSocket.onError(t)));
+    vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> 
delegatedWebSocket.onError(t)));
   }
 
   private void linkVertxFrameHandler() {
@@ -82,27 +132,35 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
   private void linkVertxBinaryMessageHandler() {
     vertxWebSocket.binaryMessageHandler(buffer -> {
       final byte[] bytes = buffer.getBytes();
-      executor.execute(
+      scheduleTask(
           () -> delegatedWebSocket.onMessage(new 
BinaryBytesWebSocketMessage(bytes)));
     });
   }
 
   private void linkVertxTextMessageHandler() {
     vertxWebSocket.textMessageHandler(s ->
-        executor.execute(
+        scheduleTask(
             () -> delegatedWebSocket.onMessage(new TextWebSocketMessage(s))));
   }
 
   private void linkVertxDrainHandler() {
     vertxWebSocket.drainHandler(v ->
-        executor.execute(delegatedWebSocket::onDrain));
+        scheduleTask(delegatedWebSocket::onWriteQueueDrain));
   }
 
   private void startWorking() {
-    executor.execute(
+    scheduleTask(
         delegatedWebSocket::onConnectionReady);
   }
 
+  private void scheduleTask(Runnable task) {
+    try {
+      executor.execute(task);
+    } catch (Throwable e) {
+      LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, 
websocketSessionId, e);
+    }
+  }
+
   @Override
   public CompletableFuture<Void> sendMessage(WebSocketMessage<?> message) {
     if (message instanceof TextWebSocketMessage) {
@@ -136,12 +194,26 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   @Override
   public void pause() {
-    vertxWebSocket.pause();
+    if (!inPauseStatus.compareAndSet(false, true)) {
+      return;
+    }
+    LOGGER.info("[{}]-[{}] pause websocket", invocationType, 
websocketSessionId);
+    synchronized (this) {
+      vertxWebSocket.pause();
+      inPauseStatus.set(true);
+    }
   }
 
   @Override
   public void resume() {
-    vertxWebSocket.resume();
+    if (!inPauseStatus.compareAndSet(true, false)) {
+      return;
+    }
+    LOGGER.info("[{}]-[{}] resume websocket", invocationType, 
websocketSessionId);
+    synchronized (this) {
+      vertxWebSocket.resume();
+      inPauseStatus.set(false);
+    }
   }
 
   @Override
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
index afa887e76..437a0ba23 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
@@ -52,7 +52,7 @@ public class WebSocketHandshakeServerFilter implements 
HttpServerFilter {
     if (invocation.isEdge()
         && (invocation.getLocalContext(WEBSOCKET_PIPE_CONTEXT_KEY) == null)) {
       // prepare pipe for edge forward websocket scene
-      final WebSocketPipe webSocketPipe = new WebSocketPipe();
+      final WebSocketPipe webSocketPipe = new 
WebSocketPipe(invocation.getTraceId());
       invocation.addLocalContext(WEBSOCKET_PIPE_CONTEXT_KEY, webSocketPipe);
       
invocation.addLocalContext(ClientWebSocketArgumentMapper.SCB_CLIENT_WEBSOCKET_LOCAL_CONTEXT_KEY,
           webSocketPipe.getClientWebSocket());
@@ -73,7 +73,10 @@ public class WebSocketHandshakeServerFilter implements 
HttpServerFilter {
         ((VertxServerRequestToHttpServletRequest) invocation.getRequestEx())
             .toWebSocket()
             .whenComplete((ws, t) ->
-                new 
VertxWebSocketAdaptor(invocation.getOperationMeta().getExecutor(),
+                new VertxWebSocketAdaptor(
+                    invocation.getInvocationType(),
+                    invocation.getTraceId(),
+                    invocation.getOperationMeta().getExecutor(),
                     (ServerWebSocket) result,
                     ws));
       }
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
index eed2389a7..04fbaec0c 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
@@ -68,6 +68,8 @@ public class WebSocketResponseWrapClientFilter implements 
HttpClientFilter {
         .getOperationMeta()
         .getExecutor();
     final VertxWebSocketAdaptor webSocketAdaptor = new VertxWebSocketAdaptor(
+        invocation.getInvocationType(),
+        invocation.getTraceId(),
         executor,
         clientWebSocket,
         webSocketResponseEx.getVertxClientWebSocket());


Reply via email to