This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 6c611d5c2aa [improve][websocket][branch-2.11] Add ping support (#19255)
6c611d5c2aa is described below

commit 6c611d5c2aa130b36e7c2fda4567604f9eff016f
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Jan 18 13:27:51 2023 +0800

    [improve][websocket][branch-2.11] Add ping support (#19255)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 ++
 .../websocket/proxy/ProxyIdleTimeoutTest.java      | 105 ++++++++++++++++++++
 .../pulsar/websocket/proxy/ProxyPingTest.java      | 107 +++++++++++++++++++++
 .../pulsar/proxy/server/ProxyConfiguration.java    |   6 ++
 .../pulsar/websocket/AbstractWebSocketHandler.java |  30 ++++++
 .../service/WebSocketProxyConfiguration.java       |   3 +
 .../websocket/AbstractWebSocketHandlerTest.java    |  56 +++++++++++
 7 files changed, 313 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5942ee9a8e9..7e9534bc9bc 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2519,6 +2519,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "The maximum size of a text message during parsing in WebSocket 
proxy."
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
new file mode 100644
index 00000000000..71f8c0b6d86
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
+    protected String methodName;
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+        config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
+        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @Test
+    public void testIdleTimeout() throws Exception {
+        String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+        URI produceUri = URI.create(producerUri);
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            assertThat(producerFuture).succeedsWithin(2, SECONDS);
+            Session session = producerFuture.get();
+            Awaitility.await().during(5, SECONDS).untilAsserted(() -> 
assertThat(session.isOpen()).isFalse());
+        } finally {
+            produceClient.stop();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
new file mode 100644
index 00000000000..6e0dfa46cfd
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyPingTest extends ProducerConsumerBase {
+    protected String methodName;
+
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+        config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+        config.setWebSocketPingDurationSeconds(2);
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
+        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @Test
+    public void testPing() throws Exception {
+        String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+        URI produceUri = URI.create(producerUri);
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            assertThat(producerFuture).succeedsWithin(2, SECONDS);
+            Session session = producerFuture.get();
+            Awaitility.await().during(5, SECONDS).untilAsserted(() -> 
assertThat(session.isOpen()).isTrue());
+        } finally {
+            produceClient.stop();
+        }
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 2555d648000..dc6e04caaf1 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -783,6 +783,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean webSocketServiceEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
             category = CATEGORY_WEBSOCKET,
             doc = "Name of the cluster to which this broker belongs to"
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 1bd0055bd1d..8ac4c2552ec 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -19,12 +19,17 @@
 package org.apache.pulsar.websocket;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.lang3.StringUtils;
@@ -62,6 +67,8 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
     protected final Map<String, String> queryParams;
     private static final String PULSAR_AUTH_METHOD_NAME = 
"X-Pulsar-Auth-Method-Name";
 
+    private ScheduledFuture<?> pingFuture;
+
     public AbstractWebSocketHandler(WebSocketService service,
                                     HttpServletRequest request,
                                     ServletUpgradeResponse response) {
@@ -169,9 +176,25 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         }
     }
 
+    private void closePingFuture() {
+        if (pingFuture != null && !pingFuture.isDone()) {
+            pingFuture.cancel(true);
+        }
+    }
+
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
+        int webSocketPingDurationSeconds = 
service.getConfig().getWebSocketPingDurationSeconds();
+        if (webSocketPingDurationSeconds > 0) {
+            pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+                try {
+                    
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+                } catch (IOException e) {
+                    log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
+                }
+            }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
+        }
         log.info("[{}] New WebSocket session on topic {}", 
session.getRemoteAddress(), topic);
     }
 
@@ -180,6 +203,7 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         super.onWebSocketError(cause);
         log.info("[{}] WebSocket error on topic {} : {}", 
getSession().getRemoteAddress(), topic, cause.getMessage());
         try {
+            closePingFuture();
             close();
         } catch (IOException e) {
             log.error("Failed in closing WebSocket session for topic {} with 
error: {}", topic, e.getMessage());
@@ -191,6 +215,7 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         log.info("[{}] Closed WebSocket session on topic {}. status: {} - 
reason: {}", getSession().getRemoteAddress(),
                 topic, statusCode, reason);
         try {
+            closePingFuture();
             close();
         } catch (IOException e) {
             log.warn("[{}] Failed to close handler for topic {}. ", 
getSession().getRemoteAddress(), topic, e);
@@ -259,6 +284,11 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         return TopicName.get(domain, namespace, name);
     }
 
+    @VisibleForTesting
+    public ScheduledFuture<?> getPingFuture() {
+        return pingFuture;
+    }
+
     protected abstract Boolean isAuthorized(String authRole,
                                             AuthenticationDataSource 
authenticationData) throws Exception;
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 8fb3ab7ed01..a69c61ae689 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -161,6 +161,9 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     @FieldContext(doc = "Timeout of idling WebSocket session (in 
milliseconds)")
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
+    @FieldContext(doc = "Interval of time to sending the ping to keep alive. 
This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(doc = "When this parameter is not empty, unauthenticated 
users perform as anonymousUserRole")
     private String anonymousUserRole = null;
 
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 29d8d64159c..cb8a499120d 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URLEncoder;
@@ -31,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
@@ -49,6 +51,10 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.mockito.Mock;
 import org.testng.annotations.AfterClass;
@@ -380,4 +386,54 @@ public class AbstractWebSocketHandlerTest {
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), 
"dead-letter-topic");
         assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
+
+    @Test
+    public void testPingFuture() {
+        WebSocketProxyConfiguration webSocketProxyConfiguration = new 
WebSocketProxyConfiguration();
+        webSocketProxyConfiguration.setWebSocketPingDurationSeconds(5);
+
+        WebSocketService webSocketService = new 
WebSocketService(webSocketProxyConfiguration);
+
+        HttpServletRequest httpServletRequest = mock(HttpServletRequest.class);
+        String consumerV2 = 
"/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+        Map<String, String[]> queryParams = new HashMap<String, String>(){{
+            put("ackTimeoutMillis", "1001");
+            put("subscriptionType", "Key_Shared");
+            put("subscriptionMode", "NonDurable");
+            put("receiverQueueSize", "999");
+            put("consumerName", "my-consumer");
+            put("priorityLevel", "1");
+            put("maxRedeliverCount", "5");
+        }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
entry -> new String[]{ entry.getValue() }));
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+        when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+        MockedServletUpgradeResponse response = new 
MockedServletUpgradeResponse(null);
+        AbstractWebSocketHandler webSocketHandler = new 
WebSocketHandlerImpl(webSocketService, httpServletRequest, response);
+
+        Session session = mock(Session.class);
+        RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+        when(session.getRemote()).thenReturn(remoteEndpoint);
+
+        // onWebSocketClose
+        webSocketHandler.onWebSocketConnect(session);
+
+        ScheduledFuture<?> pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        
webSocketHandler.onWebSocketClose(HttpStatus.INTERNAL_SERVER_ERROR_500, 
"INTERNAL_SERVER_ERROR_500");
+        assertTrue(pingFuture.isDone());
+
+        // onWebSocketError
+        webSocketHandler.onWebSocketConnect(session);
+
+        pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        webSocketHandler.onWebSocketError(new 
RuntimeException("INTERNAL_SERVER_ERROR_500"));
+        assertTrue(pingFuture.isDone());
+    }
 }

Reply via email to