This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 21d384c5c4a [improve][websocket] Add ping support (#19203)
21d384c5c4a is described below

commit 21d384c5c4a615f0485618a0402c025bfa20c8be
Author: Zixuan Liu <[email protected]>
AuthorDate: Tue Jan 17 10:30:49 2023 +0800

    [improve][websocket] Add ping support (#19203)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../websocket/proxy/ProxyIdleTimeoutTest.java      | 107 ++++++++++++++++++++
 .../pulsar/websocket/proxy/ProxyPingTest.java      | 109 +++++++++++++++++++++
 .../pulsar/websocket/AbstractWebSocketHandler.java |  35 +++++++
 .../apache/pulsar/websocket/WebSocketService.java  |   5 +
 .../service/WebSocketProxyConfiguration.java       |   3 +
 .../websocket/AbstractWebSocketHandlerTest.java    |  57 +++++++++++
 6 files changed, 316 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
new file mode 100644
index 00000000000..ab5a43b115a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
+    protected String methodName;
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @Test
+    public void testIdleTimeout() throws Exception {
+        String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+        URI produceUri = URI.create(producerUri);
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            assertThat(producerFuture).succeedsWithin(2, SECONDS);
+            Session session = producerFuture.get();
+            Awaitility.await().during(5, SECONDS).untilAsserted(() -> 
assertThat(session.isOpen()).isFalse());
+        } finally {
+            produceClient.stop();
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
new file mode 100644
index 00000000000..8ba92831389
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyPingTest extends ProducerConsumerBase {
+    protected String methodName;
+
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+        config.setWebSocketPingDurationSeconds(2);
+        service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @Test
+    public void testPing() throws Exception {
+        String producerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+        URI produceUri = URI.create(producerUri);
+        WebSocketClient produceClient = new WebSocketClient();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            assertThat(producerFuture).succeedsWithin(2, SECONDS);
+            Session session = producerFuture.get();
+            Awaitility.await().during(5, SECONDS).untilAsserted(() -> 
assertThat(session.isOpen()).isTrue());
+        } finally {
+            produceClient.stop();
+        }
+    }
+}
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 771b325dcbd..e19f1557b15 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -21,12 +21,17 @@ package org.apache.pulsar.websocket;
 import static com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.lang3.StringUtils;
@@ -51,6 +56,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerCommand;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -68,6 +74,8 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
     protected final ObjectReader consumerCommandReader =
             
ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class);
 
+    private ScheduledFuture<?> pingFuture;
+
     public AbstractWebSocketHandler(WebSocketService service,
                                     HttpServletRequest request,
                                     ServletUpgradeResponse response) {
@@ -175,9 +183,28 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         }
     }
 
+    private void closePingFuture() {
+        if (pingFuture != null && !pingFuture.isDone()) {
+            pingFuture.cancel(true);
+        }
+    }
+
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
+        WebSocketProxyConfiguration webSocketProxyConfig = 
service.getWebSocketProxyConfig();
+        if (webSocketProxyConfig != null) {
+            int webSocketPingDurationSeconds = 
webSocketProxyConfig.getWebSocketPingDurationSeconds();
+            if (webSocketPingDurationSeconds > 0) {
+                pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+                    try {
+                        
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+                    } catch (IOException e) {
+                        log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
+                    }
+                }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
+            }
+        }
         log.info("[{}] New WebSocket session on topic {}", 
session.getRemoteAddress(), topic);
     }
 
@@ -186,6 +213,7 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         super.onWebSocketError(cause);
         log.info("[{}] WebSocket error on topic {} : {}", 
getSession().getRemoteAddress(), topic, cause.getMessage());
         try {
+            closePingFuture();
             close();
         } catch (IOException e) {
             log.error("Failed in closing WebSocket session for topic {} with 
error: {}", topic, e.getMessage());
@@ -197,6 +225,7 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         log.info("[{}] Closed WebSocket session on topic {}. status: {} - 
reason: {}", getSession().getRemoteAddress(),
                 topic, statusCode, reason);
         try {
+            closePingFuture();
             close();
         } catch (IOException e) {
             log.warn("[{}] Failed to close handler for topic {}. ", 
getSession().getRemoteAddress(), topic, e);
@@ -265,6 +294,12 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
         return TopicName.get(domain, namespace, name);
     }
 
+    @VisibleForTesting
+    public ScheduledFuture<?> getPingFuture() {
+        return pingFuture;
+    }
+
+
     protected abstract Boolean isAuthorized(String authRole,
                                             AuthenticationDataSource 
authenticationData) throws Exception;
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 04a9354b7a5..f928d830074 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -68,6 +68,10 @@ public class WebSocketService implements Closeable {
     private PulsarResources pulsarResources;
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
+
+    @Getter
+    private WebSocketProxyConfiguration webSocketProxyConfig;
+
     @Getter
     private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
 
@@ -79,6 +83,7 @@ public class WebSocketService implements Closeable {
 
     public WebSocketService(WebSocketProxyConfiguration config) {
         this(createClusterData(config), 
PulsarConfigurationLoader.convertFrom(config));
+        this.webSocketProxyConfig = config;
     }
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration 
config) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index ca827e53c90..7dba9e719ad 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -161,6 +161,9 @@ public class WebSocketProxyConfiguration implements 
PulsarConfiguration {
     @FieldContext(doc = "Timeout of idling WebSocket session (in 
milliseconds)")
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
+    @FieldContext(doc = "Interval of time to sending the ping to keep alive. 
This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(doc = "When this parameter is not empty, unauthenticated 
users perform as anonymousUserRole")
     private String anonymousUserRole = null;
 
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 0e07dd0bc00..eec2c3d1baa 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URLEncoder;
@@ -31,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
@@ -49,6 +51,10 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.mockito.Mock;
 import org.testng.annotations.AfterClass;
@@ -380,4 +386,55 @@ public class AbstractWebSocketHandlerTest {
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), 
"dead-letter-topic");
         assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
+
+    @Test
+    public void testPingFuture() {
+        WebSocketProxyConfiguration webSocketProxyConfiguration = new 
WebSocketProxyConfiguration();
+        webSocketProxyConfiguration.setWebSocketPingDurationSeconds(5);
+
+        WebSocketService webSocketService = new 
WebSocketService(webSocketProxyConfiguration);
+
+        HttpServletRequest httpServletRequest = mock(HttpServletRequest.class);
+        String consumerV2 = 
"/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+        Map<String, String[]> queryParams = new HashMap<String, String>(){{
+            put("ackTimeoutMillis", "1001");
+            put("subscriptionType", "Key_Shared");
+            put("subscriptionMode", "NonDurable");
+            put("receiverQueueSize", "999");
+            put("consumerName", "my-consumer");
+            put("priorityLevel", "1");
+            put("maxRedeliverCount", "5");
+        }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
entry -> new String[]{ entry.getValue() }));
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+        when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+        MockedServletUpgradeResponse response = new 
MockedServletUpgradeResponse(null);
+        AbstractWebSocketHandler webSocketHandler = new 
WebSocketHandlerImpl(webSocketService, httpServletRequest, response);
+
+        Session session = mock(Session.class);
+        RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+        when(session.getRemote()).thenReturn(remoteEndpoint);
+
+        // onWebSocketClose
+        webSocketHandler.onWebSocketConnect(session);
+
+        ScheduledFuture<?> pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        
webSocketHandler.onWebSocketClose(HttpStatus.INTERNAL_SERVER_ERROR_500, 
"INTERNAL_SERVER_ERROR_500");
+        assertTrue(pingFuture.isDone());
+
+
+        // onWebSocketError
+        webSocketHandler.onWebSocketConnect(session);
+
+        pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        webSocketHandler.onWebSocketError(new 
RuntimeException("INTERNAL_SERVER_ERROR_500"));
+        assertTrue(pingFuture.isDone());
+    }
 }

Reply via email to