This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch 2.8.x in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit bb8c6fd88d948e5044523c7b3979a091ff8c5f85 Author: yhs0092 <[email protected]> AuthorDate: Mon Dec 9 21:29:20 2024 +0800 [SCB-2894] support WebSocket message handling logic run on the Executors serially and fix EdgeService may be hanged in WebSocket handshake procedure because it's too late to pause http handshake request. --- .../common/rest/AbstractRestInvocation.java | 10 - .../rest/filter/inner/ClientRestArgsFilter.java | 2 +- .../vertx/client/http/HttpClientOptionsSPI.java | 1 - .../foundation/vertx/client/http/HttpClients.java | 3 +- .../vertx/client/ws/WebSocketClientOptionsSPI.java | 2 +- .../invocation/ws/AbstractBaseWebSocket.java | 13 +- .../invocation/ws/SerialExecutorWrapper.java | 223 +++++++++++++++++++++ .../swagger/invocation/ws/WebSocket.java | 2 +- .../swagger/invocation/ws/WebSocketFrame.java | 2 - .../swagger/invocation/ws/WebSocketPipe.java | 68 ++++--- .../invocation/ws/SerialExecutorWrapperTest.java | 132 ++++++++++++ .../rest/client/ws/WebSocketClientInvocation.java | 2 +- .../rest/client/ws/WebSocketTransportClient.java | 3 +- .../transport/rest/vertx/RestServerVerticle.java | 15 ++ .../transport/rest/vertx/TransportConfig.java | 7 + .../rest/vertx/ws/VertxWebSocketAdaptor.java | 100 +++++++-- .../vertx/ws/WebSocketHandshakeServerFilter.java | 7 +- .../ws/WebSocketResponseWrapClientFilter.java | 2 + 18 files changed, 527 insertions(+), 67 deletions(-) diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java index b65fb519f..a9431a990 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java @@ -46,7 +46,6 @@ import org.apache.servicecomb.foundation.common.Holder; import org.apache.servicecomb.foundation.common.utils.JsonUtils; import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx; import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx; -import org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest; import org.apache.servicecomb.swagger.invocation.Response; import org.apache.servicecomb.swagger.invocation.exception.InvocationException; import org.apache.servicecomb.swagger.invocation.ws.ServerWebSocket; @@ -187,15 +186,6 @@ public abstract class AbstractRestInvocation { return; } - if (Const.WEBSOCKET.equals(invocation.getProviderTransportName())) { - // pause for WebSocket handshake, waiting for completion of the biz REST operation - if (requestEx instanceof VertxServerRequestToHttpServletRequest) { - ((VertxServerRequestToHttpServletRequest) requestEx).getContext() - .request() - .pause(); - } - } - try { operationMeta.getExecutor().execute(() -> { synchronized (this.requestEx) { diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java index c06006214..19652e2db 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ClientRestArgsFilter.java @@ -40,7 +40,7 @@ public class ClientRestArgsFilter implements HttpClientFilter { @Override public boolean enabledForTransport(String transport) { - return HttpClientFilter.super.enabledForTransport(transport)|| Const.WEBSOCKET.equals(transport); + return HttpClientFilter.super.enabledForTransport(transport) || Const.WEBSOCKET.equals(transport); } @Override diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java index 03d48fe2c..0e684e7c3 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClientOptionsSPI.java @@ -24,7 +24,6 @@ import com.netflix.config.ConcurrentCompositeConfiguration; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpVersion; -import io.vertx.core.http.WebSocketClientOptions; import io.vertx.core.net.ClientOptionsBase; import io.vertx.core.net.ProxyOptions; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java index bc982d264..4449a3d02 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/http/HttpClients.java @@ -38,7 +38,6 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.dns.AddressResolverOptions; -import io.vertx.core.http.WebSocketClient; /** * load and manages a set of HttpClient at boot up. @@ -83,7 +82,7 @@ public class HttpClients { clientOptionsList.forEach(option -> VertxUtils.blockCloseVertxByName(option.clientName())); wsClients.clear(); - List<WebSocketClientOptionsSPI> websocketClientOptionsList = + final List<WebSocketClientOptionsSPI> websocketClientOptionsList = SPIServiceUtils.getOrLoadSortedService(WebSocketClientOptionsSPI.class); websocketClientOptionsList.forEach(option -> VertxUtils.blockCloseVertxByName(option.clientName())); } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java index 95463a3b5..46b1d7bc1 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/ws/WebSocketClientOptionsSPI.java @@ -24,7 +24,7 @@ import io.vertx.core.http.HttpVersion; import io.vertx.core.http.WebSocketClientOptions; /** - * WebSocket client options must be set by implementation. + * WebSocket client options must be set by implementations. */ public abstract class WebSocketClientOptionsSPI implements HttpClientOptionsSPI { public static WebSocketClientOptions createWebSocketClientOptions(WebSocketClientOptionsSPI spi) { diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java index 2400d833e..704c71e8b 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/AbstractBaseWebSocket.java @@ -55,13 +55,24 @@ public abstract class AbstractBaseWebSocket implements WebSocket { webSocketAdapter.resume(); } + /** + * Check whether the message sending queue is full, in which case you should pause your sending action. + * To get to know when to recover sending action, please override the {@link #onWriteQueueDrain()} method + * to subscribe the notification that the message sending queue is ready to accept message again. + * + * @return true if message sending queue is full. + */ @Override public boolean writeQueueFull() { return webSocketAdapter.writeQueueFull(); } + /** + * The callback to notify when the message sending queue is ready to accept sending message/frame again. + * Usually this method is used in conjunction with the {@link #writeQueueFull()}. + */ @Override - public void onDrain() { + public void onWriteQueueDrain() { } public void setWebSocketAdapter(WebSocketAdapter webSocketAdapter) { diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java new file mode 100644 index 000000000..95ef34579 --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapper.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.swagger.invocation.ws; + +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.servicecomb.swagger.invocation.InvocationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SerialExecutorWrapper implements Executor { + + private static final Logger LOGGER = LoggerFactory.getLogger(SerialExecutorWrapper.class); + + private static final int LEAST_QUEUE_CAPACITY = 10; + + private final InvocationType invocationType; + + private final String id; + + private final Queue<Runnable> queue; + + private final Executor workerPool; + + private final int queueCapacity; + + private final int drainThreshold; + + private final int fullThreshold; + + private final AtomicBoolean scheduleFlag; + + private final int maxContinueTimes; + + private QueueDrainSubscriber queueDrainSubscriber; + + private QueueFullSubscriber queueFullSubscriber; + + public SerialExecutorWrapper( + InvocationType invocationType, + String id, Executor workerPool, int queueCapacity, int maxContinueTimes) { + this.invocationType = invocationType; + this.id = id; + this.workerPool = workerPool; + this.queueCapacity = correctQueueCapacity(queueCapacity); + queue = new ArrayBlockingQueue<>(calculateRealQueueSize(), true); + drainThreshold = calculateDrainThreshold(); + fullThreshold = calculateFullThreshold(); + scheduleFlag = new AtomicBoolean(); + this.maxContinueTimes = correctMaxContinueTimes(maxContinueTimes); + } + + /** + * Subscribe queue drain event to resume message queue. + * Use this method in cooperation with {@link #subscribeQueueFullEvent(QueueFullSubscriber)} method. + */ + public void subscribeQueueDrainEvent(QueueDrainSubscriber subscriber) { + if (subscriber != null) { + queueDrainSubscriber = subscriber; + } + } + + /** + * Subscribe queue full event to pause message queue. + * Use this method in cooperation with {@link #subscribeQueueDrainEvent(QueueDrainSubscriber)} method. + */ + public void subscribeQueueFullEvent(QueueFullSubscriber subscriber) { + if (subscriber != null) { + queueFullSubscriber = subscriber; + } + } + + @Override + public void execute(Runnable command) { + Objects.requireNonNull(command, "command must not be null"); + Objects.requireNonNull(queueDrainSubscriber, "queueDrainSubscriber must not be null"); + queue.add(command); + markQueueFull(); + scheduleWorker(); + } + + private void scheduleWorker() { + if (!scheduleFlag.compareAndSet(false, true)) { + // a running task has been set, don't repeat + return; + } + try { + workerPool.execute(() -> { + try { + runTasks(); + } finally { + scheduleFlag.set(false); + if (!queue.isEmpty()) { + scheduleWorker(); + } + } + }); + } catch (Throwable e) { + // in case that the underlying executor queue full, the scheduleFlag should be recovered + scheduleFlag.set(false); + LOGGER.error("[{}]-[{}] failed to execute task in actual thread pool!", invocationType, id, e); + } + } + + private void runTasks() { + int workCount = 1; + while (true) { + final Runnable task = queue.poll(); + if (task == null) { + break; + } + + try { + task.run(); + } catch (Throwable e) { + LOGGER.error("[{}]-[{}] error occurred while executing task[{}]", invocationType, id, task, e); + } + + ++workCount; + if (workCount > maxContinueTimes) { + break; + } + } + + notifyIfQueueDrain(); + } + + private void markQueueFull() { + final QueueFullSubscriber subscriber = queueFullSubscriber; + if (subscriber == null) { + return; + } + + if ((queue.size() < fullThreshold)) { + return; + } + + LOGGER.warn("[{}]-[{}] queue nearly full! queue length: {}/{}", + invocationType, id, queue.size(), calculateRealQueueSize()); + try { + subscriber.run(); + } catch (Throwable e) { + LOGGER.error("[{}]-[{}] error occurred while notifying queue full subscriber[{}]", invocationType, id, + subscriber, e); + } + if (queue.size() < drainThreshold) { + // in case that all tasks has been completed and WebSocket is paused, + // and no task to trigger the WebSocket get resumed + execute(this::notifyIfQueueDrain); + } + } + + private void notifyIfQueueDrain() { + final QueueDrainSubscriber subscriber = queueDrainSubscriber; + if (subscriber == null) { + return; + } + if (queue.size() > drainThreshold) { + return; + } + try { + subscriber.run(); + } catch (Throwable e) { + LOGGER.error("[{}]-[{}] error occurred while notifying queue drain subscriber[{}]", invocationType, id, + subscriber, e); + } + } + + private int calculateRealQueueSize() { + return queueCapacity + 50; + } + + private int calculateDrainThreshold() { + return (int) (queueCapacity * 0.25); + } + + private int calculateFullThreshold() { + return (int) (queueCapacity * 0.9); + } + + private int correctQueueCapacity(int queueCapacity) { + if (queueCapacity < LEAST_QUEUE_CAPACITY) { + LOGGER.warn("queue capacity less than 10 does not make sense, adjust to {}", LEAST_QUEUE_CAPACITY); + return LEAST_QUEUE_CAPACITY; + } + return queueCapacity; + } + + private int correctMaxContinueTimes(int maxContinueTimes) { + if (maxContinueTimes < 1) { + LOGGER.warn("maxContinueTimes less than 1 does not make sense, adjust to 1"); + return 1; + } + return maxContinueTimes; + } + + public interface QueueDrainSubscriber { + void run(); + } + + public interface QueueFullSubscriber { + void run(); + } +} diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java index cde202c1d..7cfb666ff 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocket.java @@ -55,5 +55,5 @@ public interface WebSocket { boolean writeQueueFull(); - void onDrain(); + void onWriteQueueDrain(); } diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java index e2ca12b52..7b69c5854 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketFrame.java @@ -22,6 +22,4 @@ package org.apache.servicecomb.swagger.invocation.ws; * There should be such a concept, but not need to supply such function for now. */ public class WebSocketFrame { - private boolean isFinal; - private WebSocketFrameType type; } diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java index c24a96db0..76da45cdf 100644 --- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java +++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/ws/WebSocketPipe.java @@ -30,13 +30,20 @@ import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; public class WebSocketPipe { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketPipe.class); + private final String websocketSessionId; + private final PipeServerWebSocket serverWebSocket; private final PipeClientWebSocket clientWebSocket; + /** + * There is chance that the server and client try to close connection at the same time. + * Use one lock to avoid the client and server websocket deadlock in closing procedure. + */ private final Object statusLock = new Object(); - public WebSocketPipe() { + public WebSocketPipe(String websocketSessionId) { + this.websocketSessionId = websocketSessionId; this.clientWebSocket = new PipeClientWebSocket(); this.serverWebSocket = new PipeServerWebSocket(); this.clientWebSocket.connect(serverWebSocket); @@ -97,32 +104,33 @@ public class WebSocketPipe { peer.sendMessage(message) .whenComplete((v, t) -> { if (t != null) { - LOGGER.error("failed to forward message", t); + LOGGER.error("[{}] failed to forward message", websocketSessionId, t); } }); if (peer.writeQueueFull()) { this.pause(); - LOGGER.debug("pipe paused, direction is server to client"); + LOGGER.debug("[{}] pipe paused, direction is server to client", websocketSessionId); } } @Override public void onError(Throwable t) { - LOGGER.error("websocket error", t); + LOGGER.error("[{}] websocket error", websocketSessionId, t); synchronized (statusLock) { transferStatus(PipeWebSocketStatus.ERROR); - safelyClose(); + safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), + WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); } } @Override public void onClose(Short closeStatusCode, String closeReason) { transferStatus(PipeWebSocketStatus.CLOSED); - safelyClose(); // should close peer + safelyClose(closeStatusCode, closeReason); // should close peer } @Override - public void onDrain() { + public void onWriteQueueDrain() { peer.resume(); } @@ -132,6 +140,7 @@ public class WebSocketPipe { if (status == PipeWebSocketStatus.CLOSING || status == PipeWebSocketStatus.CLOSED) { return CompletableFuture.completedFuture(null); } + transferStatus(PipeWebSocketStatus.CLOSING); if (status.ordinal() < PipeWebSocketStatus.RUNNING.ordinal()) { transferStatus(PipeWebSocketStatus.CLOSED); @@ -151,7 +160,9 @@ public class WebSocketPipe { private void transferStatus(PipeWebSocketStatus expectedOldStatus, PipeWebSocketStatus newStatus) { synchronized (statusLock) { if (status != expectedOldStatus) { - safelyClose(); + safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), + WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + LOGGER.error("[{}] illegal state transfer: from {} to {}", websocketSessionId, expectedOldStatus, newStatus); throw new IllegalStateException("Illegal state transfer: [" + expectedOldStatus + "] to [" @@ -162,21 +173,19 @@ public class WebSocketPipe { } } - private void safelyClose() { + private void safelyClose(Short closeStatusCode, String closeReason) { try { - close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), - WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + close(closeStatusCode, closeReason); } catch (Throwable e) { - LOGGER.error("failed to close pipe server websocket", e); + LOGGER.error("[{}] failed to close pipe server websocket", websocketSessionId, e); } if (peer == null) { return; } try { - peer.close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), - WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + peer.close(closeStatusCode, closeReason); } catch (Throwable e) { - LOGGER.error("failed to close pipe client websocket", e); + LOGGER.error("[{}] failed to close pipe client websocket", websocketSessionId, e); } } } @@ -215,32 +224,33 @@ public class WebSocketPipe { peer.sendMessage(message) .whenComplete((v, t) -> { if (t != null) { - LOGGER.error("failed to forward message", t); + LOGGER.error("[{}] failed to forward message", websocketSessionId, t); } }); if (peer.writeQueueFull()) { this.pause(); - LOGGER.debug("pipe paused, direction is client to server"); + LOGGER.debug("[{}] pipe paused, direction is client to server", websocketSessionId); } } @Override public void onError(Throwable t) { - LOGGER.error("websocket error", t); + LOGGER.error("[{}] websocket error", websocketSessionId, t); synchronized (statusLock) { transferStatus(PipeWebSocketStatus.ERROR); - safelyClose(); + safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), + WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); } } @Override public void onClose(Short closeStatusCode, String closeReason) { transferStatus(PipeWebSocketStatus.CLOSED); - safelyClose(); // should close peer + safelyClose(closeStatusCode, closeReason); // should close peer } @Override - public void onDrain() { + public void onWriteQueueDrain() { peer.resume(); } @@ -253,7 +263,9 @@ public class WebSocketPipe { private void transferStatus(PipeWebSocketStatus expectedOldStatus, PipeWebSocketStatus newStatus) { synchronized (statusLock) { if (status != expectedOldStatus) { - safelyClose(); + safelyClose((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), + WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + LOGGER.error("[{}] illegal state transfer: from {} to {}", websocketSessionId, expectedOldStatus, newStatus); throw new IllegalStateException("Illegal state transfer: [" + expectedOldStatus + "] to [" @@ -264,21 +276,19 @@ public class WebSocketPipe { } } - private void safelyClose() { + private void safelyClose(Short closeStatusCode, String closeReason) { try { - close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), - WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + close(closeStatusCode, closeReason); } catch (Throwable e) { - LOGGER.error("failed to close pipe client websocket", e); + LOGGER.error("[{}] failed to close pipe client websocket", websocketSessionId, e); } if (peer == null) { return; } try { - peer.close((short) WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), - WebSocketCloseStatus.INTERNAL_SERVER_ERROR.reasonText()); + peer.close(closeStatusCode, closeReason); } catch (Throwable e) { - LOGGER.error("failed to close pipe server websocket", e); + LOGGER.error("[{}] failed to close pipe server websocket", websocketSessionId, e); } } } diff --git a/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java new file mode 100644 index 000000000..7e9ff171f --- /dev/null +++ b/swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/ws/SerialExecutorWrapperTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.swagger.invocation.ws; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.servicecomb.swagger.invocation.InvocationType; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * SerialExecutorWrapper UT + */ +public class SerialExecutorWrapperTest { + private ExecutorService workerPools; + + @Before + public void before() { + workerPools = Executors.newFixedThreadPool(20); + } + + @After + public void after() { + workerPools.shutdown(); + } + + @Test + public void executeSingleWrapper() throws InterruptedException { + // queueCapacity(10) + bufferCapacity(50) + taskInWorking(1) = 61, this is the max task count that not trigger queue full exception + final int taskCount = 61; + final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(InvocationType.PRODUCER, + "testSerialWrapper", workerPools, 10, 3); + wrapper.subscribeQueueDrainEvent(() -> { + }); + final Object lock = new Object(); + final List<Integer> resultList = new ArrayList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(taskCount); + synchronized (lock) { + final CountDownLatch continueLoopLatch = new CountDownLatch(1); + wrapper.execute(() -> { + continueLoopLatch.countDown(); + synchronized (lock) { + resultList.add(0); + countDownLatch.countDown(); + } + }); + continueLoopLatch.await(1, TimeUnit.MINUTES); // wait the first task start running to ensure a stable result. + for (int i = 1; i < taskCount; ++i) { + final int index = i; + wrapper.execute(() -> { + resultList.add(index); + countDownLatch.countDown(); + }); + } + MatcherAssert.assertThat(resultList, Matchers.empty()); + } + countDownLatch.await(1, TimeUnit.MINUTES); + MatcherAssert.assertThat(resultList, Matchers.hasSize(taskCount)); + for (int i = 0; i < taskCount; ++i) { + // must execute + MatcherAssert.assertThat(resultList.get(i), Matchers.equalTo(i)); + } + } + + @Test + public void executeSingleWrapperQueueFull() throws InterruptedException { + final int taskCount = 62; // taskCount +1 than executeSingleWrapper method + final SerialExecutorWrapper wrapper = new SerialExecutorWrapper(InvocationType.PRODUCER, + "testSerialWrapper", workerPools, 10, 3); + wrapper.subscribeQueueDrainEvent(() -> { + }); + final Object lock = new Object(); + final AtomicBoolean exceptionFlag = new AtomicBoolean(); + final List<Integer> resultList = new ArrayList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(taskCount - 1); + synchronized (lock) { + final CountDownLatch continueLoopLatch = new CountDownLatch(1); + wrapper.execute(() -> { + continueLoopLatch.countDown(); + synchronized (lock) { + resultList.add(0); + countDownLatch.countDown(); + } + }); + continueLoopLatch.await(1, TimeUnit.MINUTES); // wait the first task start running to ensure a stable result. + for (int i = 1; i < taskCount; ++i) { + final int index = i; + try { + wrapper.execute(() -> { + resultList.add(index); + countDownLatch.countDown(); + }); + } catch (Exception e) { + exceptionFlag.set(true); + MatcherAssert.assertThat(i, Matchers.equalTo(taskCount - 1)); + MatcherAssert.assertThat(e, Matchers.instanceOf(IllegalStateException.class)); + } + } + MatcherAssert.assertThat(resultList, Matchers.empty()); + } + countDownLatch.await(1, TimeUnit.MINUTES); + MatcherAssert.assertThat(exceptionFlag.get(), Matchers.equalTo(true)); + MatcherAssert.assertThat(resultList, Matchers.hasSize(taskCount - 1)); + for (int i = 0; i < taskCount - 1; ++i) { + MatcherAssert.assertThat(resultList.get(i), Matchers.equalTo(i)); + } + } +} diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java index 9bac6305e..375110b84 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketClientInvocation.java @@ -56,7 +56,7 @@ import io.vertx.core.http.WebSocketConnectOptions; public class WebSocketClientInvocation { - public static final Logger LOGGER = LoggerFactory.getLogger(WebSocketClientInvocation.class); + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketClientInvocation.class); private static final String[] INTERNAL_HEADERS = new String[] { Const.CSE_CONTEXT, diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java index a31d9a1d3..c606ab42f 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/ws/WebSocketTransportClient.java @@ -41,8 +41,7 @@ public class WebSocketTransportClient { public void send(Invocation invocation, AsyncResponse asyncResp) { final WebSocketClientWithContext webSocketClientWithContext = findHttpClientPool(invocation); final WebSocketClientInvocation webSocketClientInvocation = new WebSocketClientInvocation( - webSocketClientWithContext, - httpClientFilters); + webSocketClientWithContext, httpClientFilters); try { webSocketClientInvocation.invoke(invocation, asyncResp); } catch (Throwable e) { diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java index 59cb6cd37..9edf23126 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java @@ -57,6 +57,7 @@ import io.vertx.core.http.Http2Settings; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.net.impl.ConnectionBase; import io.vertx.ext.web.Router; @@ -88,6 +89,7 @@ public class RestServerVerticle extends AbstractVerticle { return; } Router mainRouter = Router.router(vertx); + mountWebSocketPauseHandler(mainRouter); mountAccessLogHandler(mainRouter); mountCorsHandler(mainRouter); initDispatcher(mainRouter); @@ -163,6 +165,18 @@ public class RestServerVerticle extends AbstractVerticle { .failureHandler(failureHandler); } + private void mountWebSocketPauseHandler(Router mainRouter) { + mainRouter.route().handler(context -> { + final HttpServerRequest request = context.request(); + if (request.headers().contains( + io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) { + // pause for websocket, to avoid missing end event, which may cause vert.x HttpServerRequest toWebSocket never complete + request.pause(); + } + context.next(); + }); + } + private void mountAccessLogHandler(Router mainRouter) { if (!AccessLogConfig.INSTANCE.isServerLogEnabled()) { return; @@ -267,6 +281,7 @@ public class RestServerVerticle extends AbstractVerticle { serverOptions.setDecompressionSupported(TransportConfig.getDecompressionSupported()); serverOptions.setDecoderInitialBufferSize(TransportConfig.getDecoderInitialBufferSize()); serverOptions.setMaxInitialLineLength(TransportConfig.getMaxInitialLineLength()); + serverOptions.setLogActivity(TransportConfig.enableLogActivity()); // WebSocket config start serverOptions.setMaxWebSocketFrameSize(TransportConfig.getMaxWebSocketFrameSize()); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java index dd922fbc2..b52505168 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java @@ -182,6 +182,13 @@ public final class TransportConfig { .get(); } + public static boolean enableLogActivity() { + return DynamicPropertyFactory.getInstance() + .getBooleanProperty("servicecomb.rest.server.enableLogActivity", + false) + .get(); + } + public static boolean isCorsEnabled() { return DynamicPropertyFactory.getInstance() .getBooleanProperty(SERVICECOMB_CORS_CONFIG_BASE + ".enabled", false) diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java index 696db5af7..195c59fa7 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/VertxWebSocketAdaptor.java @@ -20,13 +20,21 @@ package org.apache.servicecomb.transport.rest.vertx.ws; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.servicecomb.core.executor.ReactiveExecutor; +import org.apache.servicecomb.swagger.invocation.InvocationType; import org.apache.servicecomb.swagger.invocation.ws.AbstractBaseWebSocket; import org.apache.servicecomb.swagger.invocation.ws.BinaryBytesWebSocketMessage; +import org.apache.servicecomb.swagger.invocation.ws.SerialExecutorWrapper; import org.apache.servicecomb.swagger.invocation.ws.TextWebSocketMessage; import org.apache.servicecomb.swagger.invocation.ws.WebSocketAdapter; import org.apache.servicecomb.swagger.invocation.ws.WebSocketFrame; import org.apache.servicecomb.swagger.invocation.ws.WebSocketMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.WebSocketBase; @@ -35,27 +43,70 @@ import io.vertx.core.http.WebSocketBase; * VertxWebSocketAdaptor */ public class VertxWebSocketAdaptor implements WebSocketAdapter { + + private static final Logger LOGGER = LoggerFactory.getLogger(VertxWebSocketAdaptor.class); + + private static final int DEFAULT_ADAPTER_QUEUE_MAX_SIZE = 100; + + private static final int DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES = 10; + private final Executor executor; private final AbstractBaseWebSocket delegatedWebSocket; + private final AtomicBoolean inPauseStatus; + private final WebSocketBase vertxWebSocket; - public VertxWebSocketAdaptor(Executor executor, AbstractBaseWebSocket delegatedWebSocket, + private final String websocketSessionId; + + private final InvocationType invocationType; + + public VertxWebSocketAdaptor( + InvocationType invocationType, + String websocketSessionId, + Executor workerPool, + AbstractBaseWebSocket delegatedWebSocket, WebSocketBase vertxWebSocket) { - Objects.requireNonNull(executor, "VertxWebSocketAdaptor executor is null"); + Objects.requireNonNull(invocationType, "VertxWebSocketAdaptor invocationType is null"); + Objects.requireNonNull(websocketSessionId, "VertxWebSocketAdaptor websocketSessionId is null"); + Objects.requireNonNull(workerPool, "VertxWebSocketAdaptor workerPool is null"); Objects.requireNonNull(delegatedWebSocket, "VertxWebSocketAdaptor delegatedWebSocket is null"); Objects.requireNonNull(vertxWebSocket, "VertxWebSocketAdaptor vertxWebSocket is null"); - this.executor = executor; + this.invocationType = invocationType; + this.websocketSessionId = websocketSessionId; + this.executor = workerPool instanceof ReactiveExecutor ? + workerPool // for reactive case, no need to wrap it into a serial queue model + : prepareSerialExecutorWrapper(workerPool); this.delegatedWebSocket = delegatedWebSocket; this.vertxWebSocket = vertxWebSocket; + inPauseStatus = new AtomicBoolean(true); + vertxWebSocket.pause(); // make sure the vert.x WebSocket pause status keep consistent with inPauseStatus flag - link(); + prepare(); delegatedWebSocket.setWebSocketAdapter(this); startWorking(); } - private void link() { + private SerialExecutorWrapper prepareSerialExecutorWrapper(Executor workerPool) { + final SerialExecutorWrapper wrapper = new SerialExecutorWrapper( + invocationType, + websocketSessionId, + workerPool, + DynamicPropertyFactory.getInstance() + .getIntProperty("servicecomb.websocket.adapter.queue.maxSize", + DEFAULT_ADAPTER_QUEUE_MAX_SIZE) + .get(), + DynamicPropertyFactory.getInstance() + .getIntProperty("servicecomb.websocket.adapter.queue.maxContinueTimes", + DEFAULT_ADAPTER_QUEUE_MAX_CONTINUE_TIMES) + .get()); + wrapper.subscribeQueueDrainEvent(this::resume); + wrapper.subscribeQueueFullEvent(this::pause); + return wrapper; + } + + private void prepare() { linkVertxDrainHandler(); linkVertxTextMessageHandler(); linkVertxBinaryMessageHandler(); @@ -66,12 +117,11 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private void linkVertxCloseHandler() { vertxWebSocket.closeHandler(v -> - executor.execute( - () -> delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); + scheduleTask(() -> delegatedWebSocket.onClose(vertxWebSocket.closeStatusCode(), vertxWebSocket.closeReason()))); } private void linkVertxExceptionHandler() { - vertxWebSocket.exceptionHandler(t -> executor.execute(() -> delegatedWebSocket.onError(t))); + vertxWebSocket.exceptionHandler(t -> scheduleTask(() -> delegatedWebSocket.onError(t))); } private void linkVertxFrameHandler() { @@ -82,27 +132,35 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { private void linkVertxBinaryMessageHandler() { vertxWebSocket.binaryMessageHandler(buffer -> { final byte[] bytes = buffer.getBytes(); - executor.execute( + scheduleTask( () -> delegatedWebSocket.onMessage(new BinaryBytesWebSocketMessage(bytes))); }); } private void linkVertxTextMessageHandler() { vertxWebSocket.textMessageHandler(s -> - executor.execute( + scheduleTask( () -> delegatedWebSocket.onMessage(new TextWebSocketMessage(s)))); } private void linkVertxDrainHandler() { vertxWebSocket.drainHandler(v -> - executor.execute(delegatedWebSocket::onDrain)); + scheduleTask(delegatedWebSocket::onWriteQueueDrain)); } private void startWorking() { - executor.execute( + scheduleTask( delegatedWebSocket::onConnectionReady); } + private void scheduleTask(Runnable task) { + try { + executor.execute(task); + } catch (Throwable e) { + LOGGER.error("[{}]-[{}] error occurs in scheduleTask", invocationType, websocketSessionId, e); + } + } + @Override public CompletableFuture<Void> sendMessage(WebSocketMessage<?> message) { if (message instanceof TextWebSocketMessage) { @@ -136,12 +194,26 @@ public class VertxWebSocketAdaptor implements WebSocketAdapter { @Override public void pause() { - vertxWebSocket.pause(); + if (!inPauseStatus.compareAndSet(false, true)) { + return; + } + LOGGER.info("[{}]-[{}] pause websocket", invocationType, websocketSessionId); + synchronized (this) { + vertxWebSocket.pause(); + inPauseStatus.set(true); + } } @Override public void resume() { - vertxWebSocket.resume(); + if (!inPauseStatus.compareAndSet(true, false)) { + return; + } + LOGGER.info("[{}]-[{}] resume websocket", invocationType, websocketSessionId); + synchronized (this) { + vertxWebSocket.resume(); + inPauseStatus.set(false); + } } @Override diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java index afa887e76..437a0ba23 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketHandshakeServerFilter.java @@ -52,7 +52,7 @@ public class WebSocketHandshakeServerFilter implements HttpServerFilter { if (invocation.isEdge() && (invocation.getLocalContext(WEBSOCKET_PIPE_CONTEXT_KEY) == null)) { // prepare pipe for edge forward websocket scene - final WebSocketPipe webSocketPipe = new WebSocketPipe(); + final WebSocketPipe webSocketPipe = new WebSocketPipe(invocation.getTraceId()); invocation.addLocalContext(WEBSOCKET_PIPE_CONTEXT_KEY, webSocketPipe); invocation.addLocalContext(ClientWebSocketArgumentMapper.SCB_CLIENT_WEBSOCKET_LOCAL_CONTEXT_KEY, webSocketPipe.getClientWebSocket()); @@ -73,7 +73,10 @@ public class WebSocketHandshakeServerFilter implements HttpServerFilter { ((VertxServerRequestToHttpServletRequest) invocation.getRequestEx()) .toWebSocket() .whenComplete((ws, t) -> - new VertxWebSocketAdaptor(invocation.getOperationMeta().getExecutor(), + new VertxWebSocketAdaptor( + invocation.getInvocationType(), + invocation.getTraceId(), + invocation.getOperationMeta().getExecutor(), (ServerWebSocket) result, ws)); } diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java index eed2389a7..04fbaec0c 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/ws/WebSocketResponseWrapClientFilter.java @@ -68,6 +68,8 @@ public class WebSocketResponseWrapClientFilter implements HttpClientFilter { .getOperationMeta() .getExecutor(); final VertxWebSocketAdaptor webSocketAdaptor = new VertxWebSocketAdaptor( + invocation.getInvocationType(), + invocation.getTraceId(), executor, clientWebSocket, webSocketResponseEx.getVertxClientWebSocket());
