This is an automated email from the ASF dual-hosted git repository.

yaohaishi pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit 6873ee5992ff2211a1e0efa33dd30377a914ecbf
Author: yhs0092 <[email protected]>
AuthorDate: Fri Mar 21 20:53:12 2025 +0800

    [SCB-2897] add accesslog and requestlog for WebSocket
---
 .../accessLog/ws/WebSocketAccessLogConfig.java     |  53 +++++++
 .../accessLog/ws/WebSocketAccessLogGenerator.java  |  87 +++++++++++
 .../ws/WebSocketAccessLogInitializer.java          |  80 ++++++++++
 ...rvicecomb.common.accessLog.AccessLogInitializer |   1 +
 .../core/event/WebSocketActionEvent.java           | 173 +++++++++++++++++++++
 .../swagger/invocation/ws/WebSocketActionType.java |  37 +++++
 .../swagger/invocation/ws/WebSocketPipe.java       |   4 +
 .../rest/vertx/ws/VertxWebSocketAdaptor.java       | 134 ++++++++++++----
 .../vertx/ws/WebSocketHandshakeServerFilter.java   |   5 +-
 .../ws/WebSocketResponseWrapClientFilter.java      |   5 +-
 10 files changed, 547 insertions(+), 32 deletions(-)

diff --git 
a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java
 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java
new file mode 100644
index 000000000..107c1e1c0
--- /dev/null
+++ 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.accessLog.ws;
+
+import com.netflix.config.DynamicPropertyFactory;
+
+public class WebSocketAccessLogConfig {
+  private static final String BASE = "servicecomb.accesslog.ws.";
+
+  private static final String SERVER_BASE = BASE + "server.";
+
+  private static final String CLIENT_BASE = BASE + "client.";
+
+  private static final String SERVER_LOG_ENABLED = SERVER_BASE + "enabled";
+
+  private static final String CLIENT_LOG_ENABLED = CLIENT_BASE + "enabled";
+
+  private boolean serverLogEnabled;
+
+  private boolean clientLogEnabled;
+
+  public static final WebSocketAccessLogConfig INSTANCE = new 
WebSocketAccessLogConfig();
+
+  private WebSocketAccessLogConfig() {
+    clientLogEnabled = DynamicPropertyFactory
+        .getInstance().getBooleanProperty(CLIENT_LOG_ENABLED, false).get();
+    serverLogEnabled = DynamicPropertyFactory
+        .getInstance().getBooleanProperty(SERVER_LOG_ENABLED, false).get();
+  }
+
+  public boolean isServerLogEnabled() {
+    return serverLogEnabled;
+  }
+
+  public boolean isClientLogEnabled() {
+    return clientLogEnabled;
+  }
+}
diff --git 
a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java
 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java
new file mode 100644
index 000000000..5772c7937
--- /dev/null
+++ 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.accessLog.ws;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+
+import org.apache.servicecomb.core.event.WebSocketActionEvent;
+
+/**
+ * Similar to {@link 
org.apache.servicecomb.common.accessLog.core.AccessLogGenerator},
+ * this is an access log generator for WebSocket protocol.
+ */
+public class WebSocketAccessLogGenerator {
+
+  public static final String DEFAULT_DATETIME_PATTERN = 
"yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+
+  public static final Locale DEFAULT_LOCALE = Locale.US;
+
+  public static final TimeZone TIME_ZONE = TimeZone.getDefault();
+
+  private final ThreadLocal<SimpleDateFormat> datetimeFormatHolder = new 
ThreadLocal<>();
+
+  public String generateServerLog(WebSocketActionEvent actionEvent) {
+    return generateLog(actionEvent);
+  }
+
+  public String generateClientLog(WebSocketActionEvent actionEvent) {
+    return generateLog(actionEvent);
+  }
+
+  private String generateLog(WebSocketActionEvent actionEvent) {
+    return actionEvent.getInvocationType()
+        + "|"
+        + actionEvent.getOperationMeta().getMicroserviceQualifiedName()
+        + "|"
+        + formatTimestampToDateTimeStr(actionEvent.getActionStartTimestamp())
+        + "|"
+        + actionEvent.getTraceId()
+        + "|"
+        + actionEvent.getConnectionId()
+        + "|"
+        + actionEvent.getActionType()
+        + "|"
+        + (actionEvent.getActionStartTimestamp() - 
actionEvent.getScheduleStartTimestamp())
+        + "|"
+        + (actionEvent.getActionEndTimestamp() - 
actionEvent.getActionStartTimestamp())
+        + "|"
+        + actionEvent.getHandleThreadName()
+        + "|"
+        + actionEvent.getDataSize();
+  }
+
+  private String formatTimestampToDateTimeStr(long timestamp) {
+    return getDatetimeFormat()
+        .format(new Date(timestamp));
+  }
+
+  private SimpleDateFormat getDatetimeFormat() {
+    SimpleDateFormat dateFormat = datetimeFormatHolder.get();
+    if (null == dateFormat) {
+      dateFormat = new SimpleDateFormat(DEFAULT_DATETIME_PATTERN, 
DEFAULT_LOCALE);
+      dateFormat.setTimeZone(TIME_ZONE);
+
+      datetimeFormatHolder.set(dateFormat);
+    }
+
+    return dateFormat;
+  }
+}
diff --git 
a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java
 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java
new file mode 100644
index 000000000..2b3fc799a
--- /dev/null
+++ 
b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.common.accessLog.ws;
+
+import org.apache.servicecomb.common.accessLog.AccessLogConfig;
+import org.apache.servicecomb.common.accessLog.AccessLogInitializer;
+import org.apache.servicecomb.core.event.WebSocketActionEvent;
+import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class WebSocketAccessLogInitializer implements AccessLogInitializer {
+  private static final Logger ACCESS_LOG_LOGGER = 
LoggerFactory.getLogger("ws.accesslog");
+
+  private static final Logger REQUEST_LOG_LOGGER = 
LoggerFactory.getLogger("ws.requestlog");
+
+  private WebSocketAccessLogGenerator accessLogGenerator;
+
+  private boolean clientLogEnabled;
+
+  private boolean serverLogEnabled;
+
+  @Override
+  public void init(EventBus eventBus, AccessLogConfig accessLogConfig) {
+    WebSocketAccessLogConfig config = WebSocketAccessLogConfig.INSTANCE;
+    clientLogEnabled = config.isClientLogEnabled();
+    serverLogEnabled = config.isServerLogEnabled();
+    if (clientLogEnabled || serverLogEnabled) {
+      accessLogGenerator = new WebSocketAccessLogGenerator();
+      eventBus.register(this);
+    }
+  }
+
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onRequestReceived(WebSocketActionEvent actionEvent) {
+    if (actionEvent == null) {
+      return;
+    }
+    InvocationType invocationType = actionEvent.getInvocationType();
+    if (invocationType == null) {
+      return;
+    }
+
+    switch (invocationType) {
+      case CONSUMER:
+        if (clientLogEnabled) {
+          
REQUEST_LOG_LOGGER.info(accessLogGenerator.generateClientLog(actionEvent));
+        }
+        break;
+      case PRODUCER: {
+        if (serverLogEnabled) {
+          
ACCESS_LOG_LOGGER.info(accessLogGenerator.generateServerLog(actionEvent));
+        }
+        break;
+      }
+      default:
+        throw new IllegalStateException("unexpected websocket invocation type: 
" + invocationType);
+    }
+  }
+}
diff --git 
a/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer
 
b/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer
index 159849dab..02d24d5af 100644
--- 
a/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer
+++ 
b/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer
@@ -17,3 +17,4 @@
 
 org.apache.servicecomb.common.accessLog.client.ClientDefaultInitializer
 org.apache.servicecomb.common.accessLog.server.ServerDefaultInitializer
+org.apache.servicecomb.common.accessLog.ws.WebSocketAccessLogInitializer
diff --git 
a/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java
 
b/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java
new file mode 100644
index 000000000..7c88a12d5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.apache.servicecomb.swagger.invocation.ws.WebSocket;
+import org.apache.servicecomb.swagger.invocation.ws.WebSocketActionType;
+import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage;
+
+/**
+ * A websocket action means any notification including websocket message/frame
+ * that the underlying framework passed to the user extended "WebSocket 
handler methods" to handle.
+ * And the "WebSocket handler methods" means the extensible subscriber methods 
defined in the
+ * {@link WebSocket}, like {@link WebSocket#onOpen()} and {@link 
WebSocket#onMessage(WebSocketMessage)}.
+ */
+public class WebSocketActionEvent {
+  /**
+   * To indicates whether this websocket connection is on consumer side or 
producer side.
+   */
+  private InvocationType invocationType;
+
+  private OperationMeta operationMeta;
+
+  private String traceId;
+
+  private String connectionId;
+
+  /**
+   * See {@link WebSocketActionType}.
+   */
+  private WebSocketActionType actionType;
+
+  /**
+   * The startup time of this WebSocket connection in the UnixTimestamp format.
+   * We treat the websocket handshaking success time as startup time.
+   */
+  private long connectionStartTimestamp;
+
+  /**
+   * The timestamp that the action is scheduled to run in executor.
+   */
+  private long scheduleStartTimestamp;
+
+  /**
+   * The UnixTimestamp when this websocket action is triggered.
+   */
+  private long actionStartTimestamp;
+
+  private long actionEndTimestamp;
+
+  private String handleThreadName;
+
+  /**
+   * How many bytes of data are passed to the handle methods to handle.
+   * Note that some kinds of actions carry no data, like {@link 
WebSocket#onOpen()}, in which case the dataSize is 0.
+   */
+  private long dataSize;
+
+  public InvocationType getInvocationType() {
+    return invocationType;
+  }
+
+  public WebSocketActionEvent setInvocationType(InvocationType invocationType) 
{
+    this.invocationType = invocationType;
+    return this;
+  }
+
+  public OperationMeta getOperationMeta() {
+    return operationMeta;
+  }
+
+  public WebSocketActionEvent setOperationMeta(OperationMeta operationMeta) {
+    this.operationMeta = operationMeta;
+    return this;
+  }
+
+  public String getTraceId() {
+    return traceId;
+  }
+
+  public WebSocketActionEvent setTraceId(String traceId) {
+    this.traceId = traceId;
+    return this;
+  }
+
+  public String getConnectionId() {
+    return connectionId;
+  }
+
+  public WebSocketActionEvent setConnectionId(String connectionId) {
+    this.connectionId = connectionId;
+    return this;
+  }
+
+  public WebSocketActionType getActionType() {
+    return actionType;
+  }
+
+  public WebSocketActionEvent setActionType(WebSocketActionType actionType) {
+    this.actionType = actionType;
+    return this;
+  }
+
+  public long getConnectionStartTimestamp() {
+    return connectionStartTimestamp;
+  }
+
+  public WebSocketActionEvent setConnectionStartTimestamp(long 
connectionStartTimestamp) {
+    this.connectionStartTimestamp = connectionStartTimestamp;
+    return this;
+  }
+
+  public long getScheduleStartTimestamp() {
+    return scheduleStartTimestamp;
+  }
+
+  public WebSocketActionEvent setScheduleStartTimestamp(long 
scheduleStartTimestamp) {
+    this.scheduleStartTimestamp = scheduleStartTimestamp;
+    return this;
+  }
+
+  public long getActionStartTimestamp() {
+    return actionStartTimestamp;
+  }
+
+  public WebSocketActionEvent setActionStartTimestamp(long 
actionStartTimestamp) {
+    this.actionStartTimestamp = actionStartTimestamp;
+    return this;
+  }
+
+  public long getActionEndTimestamp() {
+    return actionEndTimestamp;
+  }
+
+  public WebSocketActionEvent setActionEndTimestamp(long actionEndTimestamp) {
+    this.actionEndTimestamp = actionEndTimestamp;
+    return this;
+  }
+
+  public String getHandleThreadName() {
+    return handleThreadName;
+  }
+
+  public WebSocketActionEvent setHandleThreadName(String handleThreadName) {
+    this.handleThreadName = handleThreadName;
+    return this;
+  }
+
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public WebSocketActionEvent setDataSize(long dataSize) {
+    this.dataSize = dataSize;
+    return this;
+  }
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java
new file mode 100644
index 000000000..07d45bc48
--- /dev/null
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.swagger.invocation.ws;
+
+/**
+ * The action types performed by {@link WebSocketAdapter} that the users may 
be aware of.
+ */
+public enum WebSocketActionType {
+  CONNECTION_PREPARE,
+  ON_OPEN,
+  ON_MESSAGE_TEXT,
+  ON_MESSAGE_BINARY,
+  ON_FRAME,
+  ON_SEND_QUEUE_DRAIN,
+  ON_ERROR,
+  ON_CLOSE,
+  DO_CLOSE,
+  DO_PAUSE,
+  DO_RESUME,
+  DO_SEND_TEXT,
+  DO_SEND_BINARY;
+}
diff --git 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
index 76da45cdf..1e4d06f1a 100644
--- 
a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
+++ 
b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java
@@ -58,6 +58,10 @@ public class WebSocketPipe {
     return serverWebSocket;
   }
 
+  /**
+   * Be careful to adjust the order of the existing elements or to add new 
element!
+   * The order of status indicates the lifecycle sequence of a websocket 
connection.
+   */
   private enum PipeWebSocketStatus {
     CREATED,
     PEER_CONNECTED,
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
index 3137e0f03..cdecfe2ad 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java
@@ -21,18 +21,25 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.event.WebSocketActionEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
+import org.apache.servicecomb.core.tracing.BraveTraceIdGenerator;
+import org.apache.servicecomb.core.tracing.TraceIdGenerator;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.ws.AbstractBaseWebSocket;
 import 
org.apache.servicecomb.swagger.invocation.ws.BinaryBytesWebSocketMessage;
 import org.apache.servicecomb.swagger.invocation.ws.SerialExecutorWrapper;
 import org.apache.servicecomb.swagger.invocation.ws.TextWebSocketMessage;
+import org.apache.servicecomb.swagger.invocation.ws.WebSocketActionType;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketAdapter;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketFrame;
 import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.EventBus;
 import com.netflix.config.DynamicPropertyFactory;
 
 import io.vertx.core.buffer.Buffer;
@@ -45,6 +52,8 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(VertxWebSocketAdaptor.class);
 
+  private static final TraceIdGenerator CONNECTION_ID_GENERATOR = new 
BraveTraceIdGenerator();
+
   private static final int DEFAULT_ADAPTER_QUEUE_MAX_SIZE = 100;
 
   private static final int DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES = 10;
@@ -63,30 +72,37 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private final Object pauseLock = new Object();
 
+  private final Invocation invocation;
+
+  private final long creationTimestamp;
+
+  private final EventBus eventBus;
+
   private boolean inPauseStatus;
 
-  private final String websocketSessionId;
+  private final String connectionId;
 
   private final InvocationType invocationType;
 
   public VertxWebSocketAdaptor(
-      InvocationType invocationType,
-      String websocketSessionId,
-      Executor workerPool,
+      Invocation invocation,
+      InvocationType invocationType, Executor workerPool,
       AbstractBaseWebSocket bizWebSocket,
       WebSocketBase vertxWebSocket) {
-    Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor 
invocationType is null");
-    Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor 
websocketSessionId is null");
+    Objects.requireNonNull(invocation, "VertxWebSocketAdaptor invocation is 
null");
     Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is 
null");
     Objects.requireNonNull(bizWebSocket, "VertxWebSocketAdaptor bizWebSocket 
is null");
     Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor 
vertxWebSocket is null");
+    creationTimestamp = System.currentTimeMillis();
+    this.invocation = invocation;
     this.invocationType = invocationType;
-    this.websocketSessionId = websocketSessionId;
+    this.connectionId = CONNECTION_ID_GENERATOR.generate();
     this.executor = workerPool instanceof ReactiveExecutor ?
         workerPool // for reactive case, no need to wrap it into a serial 
queue model
         : prepareSerialExecutorWrapper(workerPool);
     this.bizWebSocket = bizWebSocket;
     this.vertxWebSocket = vertxWebSocket;
+    eventBus = EventManager.getEventBus();
     inPauseStatus = true;
     vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status 
keep consistent with inPauseStatus flag
 
@@ -104,7 +120,7 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
   private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor 
workerPool) {
     final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(
         invocationType,
-        websocketSessionId,
+        connectionId,
         workerPool,
         DynamicPropertyFactory.getInstance()
             .getIntProperty("servicecomb.websocket.adapter.queue.maxSize",
@@ -130,11 +146,14 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   private void linkVertxCloseHandler() {
     vertxWebSocket.closeHandler(v ->
-        scheduleTask(() -> 
bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
+        scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_CLOSE),
+            () -> bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), 
vertxWebSocket.closeReason())));
   }
 
   private void linkVertxExceptionHandler() {
-    vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> 
bizWebSocket.onError(t)));
+    vertxWebSocket.exceptionHandler(t -> scheduleTask(
+        createWebSocketActionEvent(WebSocketActionType.ON_ERROR),
+        () -> bizWebSocket.onError(t)));
   }
 
   private void linkVertxFrameHandler() {
@@ -145,48 +164,92 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
   private void linkVertxBinaryMessageHandler() {
     vertxWebSocket.binaryMessageHandler(buffer -> {
       final byte[] bytes = buffer.getBytes();
-      scheduleTask(
+      
scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_MESSAGE_BINARY)
+              .setDataSize(buffer.length()),
           () -> bizWebSocket.onMessage(new 
BinaryBytesWebSocketMessage(bytes)));
     });
   }
 
   private void linkVertxTextMessageHandler() {
     vertxWebSocket.textMessageHandler(s ->
-        scheduleTask(
+        
scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_MESSAGE_TEXT)
+                .setDataSize(s.length()),
             () -> bizWebSocket.onMessage(new TextWebSocketMessage(s))));
   }
 
   private void linkVertxDrainHandler() {
     vertxWebSocket.drainHandler(v ->
-        scheduleTask(bizWebSocket::onWriteQueueDrain));
+        
scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_SEND_QUEUE_DRAIN),
+            bizWebSocket::onWriteQueueDrain));
   }
 
   private void startWorking() {
+    WebSocketActionEvent event = 
createWebSocketActionEventInSyncMode(WebSocketActionType.CONNECTION_PREPARE);
     scheduleTask(
+        createWebSocketActionEvent(WebSocketActionType.ON_OPEN),
         bizWebSocket::startWorking);
+    eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis()));
+  }
+
+  private WebSocketActionEvent createWebSocketActionEvent(WebSocketActionType 
actionType) {
+    return new WebSocketActionEvent()
+        .setActionType(actionType)
+        .setConnectionStartTimestamp(creationTimestamp)
+        .setInvocationType(invocationType)
+        .setHandleThreadName(Thread.currentThread().getName())
+        .setOperationMeta(invocation.getOperationMeta())
+        .setConnectionId(connectionId)
+        .setTraceId(invocation.getTraceId())
+        .setScheduleStartTimestamp(System.currentTimeMillis());
+  }
+
+  private WebSocketActionEvent 
createWebSocketActionEventInSyncMode(WebSocketActionType actionType) {
+    return createWebSocketActionEvent(actionType)
+        .setActionStartTimestamp(System.currentTimeMillis())
+        .setHandleThreadName(Thread.currentThread().getName());
   }
 
-  private void scheduleTask(Runnable task) {
+  private void scheduleTask(WebSocketActionEvent event, Runnable task) {
     try {
-      executor.execute(task);
+      executor.execute(() -> {
+        event.setActionStartTimestamp(System.currentTimeMillis())
+            .setHandleThreadName(Thread.currentThread().getName());
+        try {
+          task.run();
+        } catch (Throwable e) {
+          LOGGER.error("[{}]-[{}] error occurs while executing task, 
actionType is {}",
+              invocationType, connectionId, event.getActionType());
+        } finally {
+          eventBus.post(
+              event.setActionEndTimestamp(System.currentTimeMillis())
+          );
+        }
+      });
     } catch (Throwable e) {
-      LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, 
websocketSessionId, e);
+      LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, 
connectionId, e);
     }
   }
 
   @Override
   public CompletableFuture<Void> sendMessage(WebSocketMessage<?> message) {
     if (message instanceof TextWebSocketMessage) {
-      return vertxWebSocket.writeTextMessage(((TextWebSocketMessage) 
message).getPayload())
-          .toCompletionStage()
-          .toCompletableFuture();
+      String payload = ((TextWebSocketMessage) message).getPayload();
+      return decorateSenderAction(WebSocketActionType.DO_SEND_TEXT,
+          payload.length(),
+          vertxWebSocket.writeTextMessage(
+                  payload)
+              .toCompletionStage()
+              .toCompletableFuture());
     }
 
     if (message instanceof BinaryBytesWebSocketMessage) {
-      return vertxWebSocket.writeBinaryMessage(Buffer.buffer(
-              ((BinaryBytesWebSocketMessage) message).getPayload()))
-          .toCompletionStage()
-          .toCompletableFuture();
+      byte[] payload = ((BinaryBytesWebSocketMessage) message).getPayload();
+      return decorateSenderAction(WebSocketActionType.DO_SEND_BINARY,
+          payload.length,
+          vertxWebSocket.writeBinaryMessage(Buffer.buffer(
+                  payload))
+              .toCompletionStage()
+              .toCompletableFuture());
     }
 
     throw new IllegalStateException("impossible case, unrecognized 
WebSocketMessage type!");
@@ -200,37 +263,52 @@ public class VertxWebSocketAdaptor implements 
WebSocketAdapter {
 
   @Override
   public CompletableFuture<Void> close(short statusCode, String reason) {
-    return vertxWebSocket.close(statusCode, reason)
-        .toCompletionStage()
-        .toCompletableFuture();
+    return decorateSenderAction(WebSocketActionType.DO_CLOSE, 0,
+        vertxWebSocket.close(statusCode, reason)
+            .toCompletionStage()
+            .toCompletableFuture());
   }
 
   @Override
   public void pause() {
+    WebSocketActionEvent event = 
createWebSocketActionEventInSyncMode(WebSocketActionType.DO_PAUSE);
     synchronized (pauseLock) {
       if (inPauseStatus) {
         return;
       }
       vertxWebSocket.pause();
       inPauseStatus = true;
-      LOGGER.info("[{}]-[{}] pause websocket", invocationType, 
websocketSessionId);
+      LOGGER.info("[{}]-[{}] pause websocket", invocationType, connectionId);
     }
+    eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis()));
   }
 
   @Override
   public void resume() {
+    WebSocketActionEvent event = 
createWebSocketActionEventInSyncMode(WebSocketActionType.DO_RESUME);
     synchronized (pauseLock) {
       if (!inPauseStatus) {
         return;
       }
       vertxWebSocket.resume();
       inPauseStatus = false;
-      LOGGER.info("[{}]-[{}] resume websocket", invocationType, 
websocketSessionId);
+      LOGGER.info("[{}]-[{}] resume websocket", invocationType, connectionId);
     }
+    eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis()));
   }
 
   @Override
   public boolean writeQueueFull() {
     return vertxWebSocket.writeQueueFull();
   }
+
+  private <T> CompletableFuture<T> decorateSenderAction(WebSocketActionType 
actionType,
+      int dataSize,
+      CompletableFuture<T> actionFuture) {
+    WebSocketActionEvent event = 
createWebSocketActionEventInSyncMode(actionType).setDataSize(dataSize);
+    actionFuture
+        .whenComplete((v, t) ->
+            
eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis())));
+    return actionFuture;
+  }
 }
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
index 4abbcf462..3c4bbcee0 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java
@@ -26,6 +26,7 @@ import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import 
org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest;
+import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import 
org.apache.servicecomb.swagger.invocation.arguments.consumer.ClientWebSocketArgumentMapper;
 import org.apache.servicecomb.swagger.invocation.ws.ServerWebSocket;
@@ -74,8 +75,8 @@ public class WebSocketHandshakeServerFilter implements 
HttpServerFilter {
             .toWebSocket()
             .whenComplete((ws, t) ->
                 new VertxWebSocketAdaptor(
-                    invocation.getInvocationType(),
-                    invocation.getTraceId(),
+                    invocation,
+                    InvocationType.PRODUCER, // must set it manually, or it's 
not correct in edge situation
                     invocation.getOperationMeta().getExecutor(),
                     (ServerWebSocket) result,
                     ws));
diff --git 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
index 04fbaec0c..928513077 100644
--- 
a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
+++ 
b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java
@@ -25,6 +25,7 @@ import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.exception.ExceptionCodes;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import 
org.apache.servicecomb.foundation.vertx.ws.VertxClientWebSocketResponseToHttpServletResponse;
+import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import 
org.apache.servicecomb.swagger.invocation.arguments.consumer.ClientWebSocketArgumentMapper;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
@@ -68,8 +69,8 @@ public class WebSocketResponseWrapClientFilter implements 
HttpClientFilter {
         .getOperationMeta()
         .getExecutor();
     final VertxWebSocketAdaptor webSocketAdaptor = new VertxWebSocketAdaptor(
-        invocation.getInvocationType(),
-        invocation.getTraceId(),
+        invocation,
+        InvocationType.CONSUMER, // must set it manually, or it's not correct 
in edge situation
         executor,
         clientWebSocket,
         webSocketResponseEx.getVertxClientWebSocket());


Reply via email to