This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch 2.8.x in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 6873ee5992ff2211a1e0efa33dd30377a914ecbf Author: yhs0092 <[email protected]> AuthorDate: Fri Mar 21 20:53:12 2025 +0800 [SCB-2897] add accesslog and requestlog for WebSocket --- .../accessLog/ws/WebSocketAccessLogConfig.java | 53 +++++++ .../accessLog/ws/WebSocketAccessLogGenerator.java | 87 +++++++++++ .../ws/WebSocketAccessLogInitializer.java | 80 ++++++++++ ...rvicecomb.common.accessLog.AccessLogInitializer | 1 + .../core/event/WebSocketActionEvent.java | 173 +++++++++++++++++++++ .../swagger/invocation/ws/WebSocketActionType.java | 37 +++++ .../swagger/invocation/ws/WebSocketPipe.java | 4 + .../rest/vertx/ws/VertxWebSocketAdaptor.java | 134 ++++++++++++---- .../vertx/ws/WebSocketHandshakeServerFilter.java | 5 +- .../ws/WebSocketResponseWrapClientFilter.java | 5 +- 10 files changed, 547 insertions(+), 32 deletions(-) diff --git a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java new file mode 100644 index 000000000..107c1e1c0 --- /dev/null +++ b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.common.accessLog.ws; + +import com.netflix.config.DynamicPropertyFactory; + +public class WebSocketAccessLogConfig { + private static final String BASE = "servicecomb.accesslog.ws."; + + private static final String SERVER_BASE = BASE + "server."; + + private static final String CLIENT_BASE = BASE + "client."; + + private static final String SERVER_LOG_ENABLED = SERVER_BASE + "enabled"; + + private static final String CLIENT_LOG_ENABLED = CLIENT_BASE + "enabled"; + + private boolean serverLogEnabled; + + private boolean clientLogEnabled; + + public static final WebSocketAccessLogConfig INSTANCE = new WebSocketAccessLogConfig(); + + private WebSocketAccessLogConfig() { + clientLogEnabled = DynamicPropertyFactory + .getInstance().getBooleanProperty(CLIENT_LOG_ENABLED, false).get(); + serverLogEnabled = DynamicPropertyFactory + .getInstance().getBooleanProperty(SERVER_LOG_ENABLED, false).get(); + } + + public boolean isServerLogEnabled() { + return serverLogEnabled; + } + + public boolean isClientLogEnabled() { + return clientLogEnabled; + } +} diff --git a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java new file mode 100644 index 000000000..5772c7937 --- /dev/null +++ b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogGenerator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.common.accessLog.ws; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; + +import org.apache.servicecomb.core.event.WebSocketActionEvent; + +/** + * Similar to {@link org.apache.servicecomb.common.accessLog.core.AccessLogGenerator}, + * this is an access log generator for WebSocket protocol. + */ +public class WebSocketAccessLogGenerator { + + public static final String DEFAULT_DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + + public static final Locale DEFAULT_LOCALE = Locale.US; + + public static final TimeZone TIME_ZONE = TimeZone.getDefault(); + + private final ThreadLocal<SimpleDateFormat> datetimeFormatHolder = new ThreadLocal<>(); + + public String generateServerLog(WebSocketActionEvent actionEvent) { + return generateLog(actionEvent); + } + + public String generateClientLog(WebSocketActionEvent actionEvent) { + return generateLog(actionEvent); + } + + private String generateLog(WebSocketActionEvent actionEvent) { + return actionEvent.getInvocationType() + + "|" + + actionEvent.getOperationMeta().getMicroserviceQualifiedName() + + "|" + + formatTimestampToDateTimeStr(actionEvent.getActionStartTimestamp()) + + "|" + + actionEvent.getTraceId() + + "|" + + actionEvent.getConnectionId() + + "|" + + actionEvent.getActionType() + + "|" + + (actionEvent.getActionStartTimestamp() - actionEvent.getScheduleStartTimestamp()) + + "|" + + (actionEvent.getActionEndTimestamp() - actionEvent.getActionStartTimestamp()) + + "|" + + actionEvent.getHandleThreadName() + + "|" + + actionEvent.getDataSize(); + } + + private String formatTimestampToDateTimeStr(long timestamp) { + return getDatetimeFormat() + .format(new Date(timestamp)); + } + + private SimpleDateFormat getDatetimeFormat() { + SimpleDateFormat dateFormat = datetimeFormatHolder.get(); + if (null == dateFormat) { + dateFormat = new SimpleDateFormat(DEFAULT_DATETIME_PATTERN, DEFAULT_LOCALE); + dateFormat.setTimeZone(TIME_ZONE); + + datetimeFormatHolder.set(dateFormat); + } + + return dateFormat; + } +} diff --git a/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java new file mode 100644 index 000000000..2b3fc799a --- /dev/null +++ b/common/common-access-log/src/main/java/org/apache/servicecomb/common/accessLog/ws/WebSocketAccessLogInitializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.common.accessLog.ws; + +import org.apache.servicecomb.common.accessLog.AccessLogConfig; +import org.apache.servicecomb.common.accessLog.AccessLogInitializer; +import org.apache.servicecomb.core.event.WebSocketActionEvent; +import org.apache.servicecomb.swagger.invocation.InvocationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + +public class WebSocketAccessLogInitializer implements AccessLogInitializer { + private static final Logger ACCESS_LOG_LOGGER = LoggerFactory.getLogger("ws.accesslog"); + + private static final Logger REQUEST_LOG_LOGGER = LoggerFactory.getLogger("ws.requestlog"); + + private WebSocketAccessLogGenerator accessLogGenerator; + + private boolean clientLogEnabled; + + private boolean serverLogEnabled; + + @Override + public void init(EventBus eventBus, AccessLogConfig accessLogConfig) { + WebSocketAccessLogConfig config = WebSocketAccessLogConfig.INSTANCE; + clientLogEnabled = config.isClientLogEnabled(); + serverLogEnabled = config.isServerLogEnabled(); + if (clientLogEnabled || serverLogEnabled) { + accessLogGenerator = new WebSocketAccessLogGenerator(); + eventBus.register(this); + } + } + + @Subscribe + @AllowConcurrentEvents + public void onRequestReceived(WebSocketActionEvent actionEvent) { + if (actionEvent == null) { + return; + } + InvocationType invocationType = actionEvent.getInvocationType(); + if (invocationType == null) { + return; + } + + switch (invocationType) { + case CONSUMER: + if (clientLogEnabled) { + REQUEST_LOG_LOGGER.info(accessLogGenerator.generateClientLog(actionEvent)); + } + break; + case PRODUCER: { + if (serverLogEnabled) { + ACCESS_LOG_LOGGER.info(accessLogGenerator.generateServerLog(actionEvent)); + } + break; + } + default: + throw new IllegalStateException("unexpected websocket invocation type: " + invocationType); + } + } +} diff --git a/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer b/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer index 159849dab..02d24d5af 100644 --- a/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer +++ b/common/common-access-log/src/main/resources/META-INF/services/org.apache.servicecomb.common.accessLog.AccessLogInitializer @@ -17,3 +17,4 @@ org.apache.servicecomb.common.accessLog.client.ClientDefaultInitializer org.apache.servicecomb.common.accessLog.server.ServerDefaultInitializer +org.apache.servicecomb.common.accessLog.ws.WebSocketAccessLogInitializer diff --git a/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java b/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java new file mode 100644 index 000000000..7c88a12d5 --- /dev/null +++ b/core/src/main/java/org/apache/servicecomb/core/event/WebSocketActionEvent.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.core.event; + +import org.apache.servicecomb.core.definition.OperationMeta; +import org.apache.servicecomb.swagger.invocation.InvocationType; +import org.apache.servicecomb.swagger.invocation.ws.WebSocket; +import org.apache.servicecomb.swagger.invocation.ws.WebSocketActionType; +import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage; + +/** + * A websocket action means any notification including websocket message/frame + * that the underlying framework passed to the user extended "WebSocket handler methods" to handle. + * And the "WebSocket handler methods" means the extensible subscriber methods defined in the + * {@link WebSocket}, like {@link WebSocket#onOpen()} and {@link WebSocket#onMessage(WebSocketMessage)}. + */ +public class WebSocketActionEvent { + /** + * To indicates whether this websocket connection is on consumer side or producer side. + */ + private InvocationType invocationType; + + private OperationMeta operationMeta; + + private String traceId; + + private String connectionId; + + /** + * See {@link WebSocketActionType}. + */ + private WebSocketActionType actionType; + + /** + * The startup time of this WebSocket connection in the UnixTimestamp format. + * We treat the websocket handshaking success time as startup time. + */ + private long connectionStartTimestamp; + + /** + * The timestamp that the action is scheduled to run in executor. + */ + private long scheduleStartTimestamp; + + /** + * The UnixTimestamp when this websocket action is triggered. + */ + private long actionStartTimestamp; + + private long actionEndTimestamp; + + private String handleThreadName; + + /** + * How many bytes of data are passed to the handle methods to handle. + * Note that some kinds of actions carry no data, like {@link WebSocket#onOpen()}, in which case the dataSize is 0. + */ + private long dataSize; + + public InvocationType getInvocationType() { + return invocationType; + } + + public WebSocketActionEvent setInvocationType(InvocationType invocationType) { + this.invocationType = invocationType; + return this; + } + + public OperationMeta getOperationMeta() { + return operationMeta; + } + + public WebSocketActionEvent setOperationMeta(OperationMeta operationMeta) { + this.operationMeta = operationMeta; + return this; + } + + public String getTraceId() { + return traceId; + } + + public WebSocketActionEvent setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + + public String getConnectionId() { + return connectionId; + } + + public WebSocketActionEvent setConnectionId(String connectionId) { + this.connectionId = connectionId; + return this; + } + + public WebSocketActionType getActionType() { + return actionType; + } + + public WebSocketActionEvent setActionType(WebSocketActionType actionType) { + this.actionType = actionType; + return this; + } + + public long getConnectionStartTimestamp() { + return connectionStartTimestamp; + } + + public WebSocketActionEvent setConnectionStartTimestamp(long connectionStartTimestamp) { + this.connectionStartTimestamp = connectionStartTimestamp; + return this; + } + + public long getScheduleStartTimestamp() { + return scheduleStartTimestamp; + } + + public WebSocketActionEvent setScheduleStartTimestamp(long scheduleStartTimestamp) { + this.scheduleStartTimestamp = scheduleStartTimestamp; + return this; + } + + public long getActionStartTimestamp() { + return actionStartTimestamp; + } + + public WebSocketActionEvent setActionStartTimestamp(long actionStartTimestamp) { + this.actionStartTimestamp = actionStartTimestamp; + return this; + } + + public long getActionEndTimestamp() { + return actionEndTimestamp; + } + + public WebSocketActionEvent setActionEndTimestamp(long actionEndTimestamp) { + this.actionEndTimestamp = actionEndTimestamp; + return this; + } + + public String getHandleThreadName() { + return handleThreadName; + } + + public WebSocketActionEvent setHandleThreadName(String handleThreadName) { + this.handleThreadName = handleThreadName; + return this; + } + + public long getDataSize() { + return dataSize; + } + + public WebSocketActionEvent setDataSize(long dataSize) { + this.dataSize = dataSize; + return this; + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java new file mode 100644 index 000000000..07d45bc48 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketActionType.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.swagger.invocation.ws; + +/** + * The action types performed by {@link WebSocketAdapter} that the users may be aware of. + */ +public enum WebSocketActionType { + CONNECTION_PREPARE, + ON_OPEN, + ON_MESSAGE_TEXT, + ON_MESSAGE_BINARY, + ON_FRAME, + ON_SEND_QUEUE_DRAIN, + ON_ERROR, + ON_CLOSE, + DO_CLOSE, + DO_PAUSE, + DO_RESUME, + DO_SEND_TEXT, + DO_SEND_BINARY; +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java index 76da45cdf..1e4d06f1a 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java @@ -58,6 +58,10 @@ public class WebSocketPipe { return serverWebSocket; } + /** + * Be careful to adjust the order of the existing elements or to add new element! + * The order of status indicates the lifecycle sequence of a websocket connection. + */ private enum PipeWebSocketStatus { CREATED, PEER_CONNECTED, diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java index 3137e0f03..cdecfe2ad 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java @@ -21,18 +21,25 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import org.apache.servicecomb.core.Invocation; +import org.apache.servicecomb.core.event.WebSocketActionEvent; import org.apache.servicecomb.core.executor.ReactiveExecutor; +import org.apache.servicecomb.core.tracing.BraveTraceIdGenerator; +import org.apache.servicecomb.core.tracing.TraceIdGenerator; +import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.swagger.invocation.InvocationType; import org.apache.servicecomb.swagger.invocation.ws.AbstractBaseWebSocket; import org.apache.servicecomb.swagger.invocation.ws.BinaryBytesWebSocketMessage; import org.apache.servicecomb.swagger.invocation.ws.SerialExecutorWrapper; import org.apache.servicecomb.swagger.invocation.ws.TextWebSocketMessage; +import org.apache.servicecomb.swagger.invocation.ws.WebSocketActionType; import org.apache.servicecomb.swagger.invocation.ws.WebSocketAdapter; import org.apache.servicecomb.swagger.invocation.ws.WebSocketFrame; import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.EventBus; import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.buffer.Buffer; @@ -45,6 +52,8 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(VertxWebSocketAdaptor.class); + private static final TraceIdGenerator CONNECTION_ID_GENERATOR = new BraveTraceIdGenerator(); + private static final int DEFAULT_ADAPTER_QUEUE_MAX_SIZE = 100; private static final int DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES = 10; @@ -63,30 +72,37 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private final Object pauseLock = new Object(); + private final Invocation invocation; + + private final long creationTimestamp; + + private final EventBus eventBus; + private boolean inPauseStatus; - private final String websocketSessionId; + private final String connectionId; private final InvocationType invocationType; public VertxWebSocketAdaptor( - InvocationType invocationType, - String websocketSessionId, - Executor workerPool, + Invocation invocation, + InvocationType invocationType, Executor workerPool, AbstractBaseWebSocket bizWebSocket, WebSocketBase vertxWebSocket) { - Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor invocationType is null"); - Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor websocketSessionId is null"); + Objects.requireNonNull(invocation, "VertxWebSocketAdaptor invocation is null"); Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is null"); Objects.requireNonNull(bizWebSocket, "VertxWebSocketAdaptor bizWebSocket is null"); Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor vertxWebSocket is null"); + creationTimestamp = System.currentTimeMillis(); + this.invocation = invocation; this.invocationType = invocationType; - this.websocketSessionId = websocketSessionId; + this.connectionId = CONNECTION_ID_GENERATOR.generate(); this.executor = workerPool instanceof ReactiveExecutor ? workerPool // for reactive case, no need to wrap it into a serial queue model : prepareSerialExecutorWrapper(workerPool); this.bizWebSocket = bizWebSocket; this.vertxWebSocket = vertxWebSocket; + eventBus = EventManager.getEventBus(); inPauseStatus = true; vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status keep consistent with inPauseStatus flag @@ -104,7 +120,7 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor workerPool) { final SerialExecutorWrapper wrapper = new SerialExecutorWrapper( invocationType, - websocketSessionId, + connectionId, workerPool, DynamicPropertyFactory.getInstance() .getIntProperty("servicecomb.websocket.adapter.queue.maxSize", @@ -130,11 +146,14 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private void linkVertxCloseHandler() { vertxWebSocket.closeHandler(v -> - scheduleTask(() -> bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); + scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_CLOSE), + () -> bizWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); } private void linkVertxExceptionHandler() { - vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> bizWebSocket.onError(t))); + vertxWebSocket.exceptionHandler(t -> scheduleTask( + createWebSocketActionEvent(WebSocketActionType.ON_ERROR), + () -> bizWebSocket.onError(t))); } private void linkVertxFrameHandler() { @@ -145,48 +164,92 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private void linkVertxBinaryMessageHandler() { vertxWebSocket.binaryMessageHandler(buffer -> { final byte[] bytes = buffer.getBytes(); - scheduleTask( + scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_MESSAGE_BINARY) + .setDataSize(buffer.length()), () -> bizWebSocket.onMessage(new BinaryBytesWebSocketMessage(bytes))); }); } private void linkVertxTextMessageHandler() { vertxWebSocket.textMessageHandler(s -> - scheduleTask( + scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_MESSAGE_TEXT) + .setDataSize(s.length()), () -> bizWebSocket.onMessage(new TextWebSocketMessage(s)))); } private void linkVertxDrainHandler() { vertxWebSocket.drainHandler(v -> - scheduleTask(bizWebSocket::onWriteQueueDrain)); + scheduleTask(createWebSocketActionEvent(WebSocketActionType.ON_SEND_QUEUE_DRAIN), + bizWebSocket::onWriteQueueDrain)); } private void startWorking() { + WebSocketActionEvent event = createWebSocketActionEventInSyncMode(WebSocketActionType.CONNECTION_PREPARE); scheduleTask( + createWebSocketActionEvent(WebSocketActionType.ON_OPEN), bizWebSocket::startWorking); + eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis())); + } + + private WebSocketActionEvent createWebSocketActionEvent(WebSocketActionType actionType) { + return new WebSocketActionEvent() + .setActionType(actionType) + .setConnectionStartTimestamp(creationTimestamp) + .setInvocationType(invocationType) + .setHandleThreadName(Thread.currentThread().getName()) + .setOperationMeta(invocation.getOperationMeta()) + .setConnectionId(connectionId) + .setTraceId(invocation.getTraceId()) + .setScheduleStartTimestamp(System.currentTimeMillis()); + } + + private WebSocketActionEvent createWebSocketActionEventInSyncMode(WebSocketActionType actionType) { + return createWebSocketActionEvent(actionType) + .setActionStartTimestamp(System.currentTimeMillis()) + .setHandleThreadName(Thread.currentThread().getName()); } - private void scheduleTask(Runnable task) { + private void scheduleTask(WebSocketActionEvent event, Runnable task) { try { - executor.execute(task); + executor.execute(() -> { + event.setActionStartTimestamp(System.currentTimeMillis()) + .setHandleThreadName(Thread.currentThread().getName()); + try { + task.run(); + } catch (Throwable e) { + LOGGER.error("[{}]-[{}] error occurs while executing task, actionType is {}", + invocationType, connectionId, event.getActionType()); + } finally { + eventBus.post( + event.setActionEndTimestamp(System.currentTimeMillis()) + ); + } + }); } catch (Throwable e) { - LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, websocketSessionId, e); + LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, connectionId, e); } } @Override public CompletableFuture<Void> sendMessage(WebSocketMessage<?> message) { if (message instanceof TextWebSocketMessage) { - return vertxWebSocket.writeTextMessage(((TextWebSocketMessage) message).getPayload()) - .toCompletionStage() - .toCompletableFuture(); + String payload = ((TextWebSocketMessage) message).getPayload(); + return decorateSenderAction(WebSocketActionType.DO_SEND_TEXT, + payload.length(), + vertxWebSocket.writeTextMessage( + payload) + .toCompletionStage() + .toCompletableFuture()); } if (message instanceof BinaryBytesWebSocketMessage) { - return vertxWebSocket.writeBinaryMessage(Buffer.buffer( - ((BinaryBytesWebSocketMessage) message).getPayload())) - .toCompletionStage() - .toCompletableFuture(); + byte[] payload = ((BinaryBytesWebSocketMessage) message).getPayload(); + return decorateSenderAction(WebSocketActionType.DO_SEND_BINARY, + payload.length, + vertxWebSocket.writeBinaryMessage(Buffer.buffer( + payload)) + .toCompletionStage() + .toCompletableFuture()); } throw new IllegalStateException("impossible case, unrecognized WebSocketMessage type!"); @@ -200,37 +263,52 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { @Override public CompletableFuture<Void> close(short statusCode, String reason) { - return vertxWebSocket.close(statusCode, reason) - .toCompletionStage() - .toCompletableFuture(); + return decorateSenderAction(WebSocketActionType.DO_CLOSE, 0, + vertxWebSocket.close(statusCode, reason) + .toCompletionStage() + .toCompletableFuture()); } @Override public void pause() { + WebSocketActionEvent event = createWebSocketActionEventInSyncMode(WebSocketActionType.DO_PAUSE); synchronized (pauseLock) { if (inPauseStatus) { return; } vertxWebSocket.pause(); inPauseStatus = true; - LOGGER.info("[{}]-[{}] pause websocket", invocationType, websocketSessionId); + LOGGER.info("[{}]-[{}] pause websocket", invocationType, connectionId); } + eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis())); } @Override public void resume() { + WebSocketActionEvent event = createWebSocketActionEventInSyncMode(WebSocketActionType.DO_RESUME); synchronized (pauseLock) { if (!inPauseStatus) { return; } vertxWebSocket.resume(); inPauseStatus = false; - LOGGER.info("[{}]-[{}] resume websocket", invocationType, websocketSessionId); + LOGGER.info("[{}]-[{}] resume websocket", invocationType, connectionId); } + eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis())); } @Override public boolean writeQueueFull() { return vertxWebSocket.writeQueueFull(); } + + private <T> CompletableFuture<T> decorateSenderAction(WebSocketActionType actionType, + int dataSize, + CompletableFuture<T> actionFuture) { + WebSocketActionEvent event = createWebSocketActionEventInSyncMode(actionType).setDataSize(dataSize); + actionFuture + .whenComplete((v, t) -> + eventBus.post(event.setActionEndTimestamp(System.currentTimeMillis()))); + return actionFuture; + } } diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java index 4abbcf462..3c4bbcee0 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java @@ -26,6 +26,7 @@ import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx; import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx; import org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest; +import org.apache.servicecomb.swagger.invocation.InvocationType; import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.arguments.consumer.ClientWebSocketArgumentMapper; import org.apache.servicecomb.swagger.invocation.ws.ServerWebSocket; @@ -74,8 +75,8 @@ public class WebSocketHandshakeServerFilter implements HttpServerFilter { .toWebSocket() .whenComplete((ws, t) -> new VertxWebSocketAdaptor( - invocation.getInvocationType(), - invocation.getTraceId(), + invocation, + InvocationType.PRODUCER, // must set it manually, or it's not correct in edge situation invocation.getOperationMeta().getExecutor(), (ServerWebSocket) result, ws)); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java index 04fbaec0c..928513077 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java @@ -25,6 +25,7 @@ import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.exception.ExceptionCodes; import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx; import org.apache.servicecomb.foundation.vertx.ws.VertxClientWebSocketResponseToHttpServletResponse; +import org.apache.servicecomb.swagger.invocation.InvocationType; import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.arguments.consumer.ClientWebSocketArgumentMapper; import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData; @@ -68,8 +69,8 @@ public class WebSocketResponseWrapClientFilter implements HttpClientFilter { .getOperationMeta() .getExecutor(); final VertxWebSocketAdaptor webSocketAdaptor = new VertxWebSocketAdaptor( - invocation.getInvocationType(), - invocation.getTraceId(), + invocation, + InvocationType.CONSUMER, // must set it manually, or it's not correct in edge situation executor, clientWebSocket, webSocketResponseEx.getVertxClientWebSocket());
