This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 17c05b3076d [fix][websocket][branch-2.9] Fix 
webSocketPingDurationSeconds config (#19293)
17c05b3076d is described below

commit 17c05b3076d2c6bb0aa7a637062a04163d6cdcad
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 28 12:08:40 2023 +0800

    [fix][websocket][branch-2.9] Fix webSocketPingDurationSeconds config 
(#19293)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../pulsar/proxy/server/ProxyConfiguration.java    |  6 +++
 .../pulsar/websocket/AbstractWebSocketHandler.java | 40 +++++++--------
 .../apache/pulsar/websocket/WebSocketService.java  |  4 --
 .../websocket/AbstractWebSocketHandlerTest.java    | 57 +++++++++++++++++++++-
 5 files changed, 85 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b57b69e2030..f2155b4431c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2108,6 +2108,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "The maximum size of a text message during parsing in WebSocket 
proxy."
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d616bbd1ce4..e75db662a75 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -664,6 +664,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean webSocketServiceEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
             category = CATEGORY_WEBSOCKET,
             doc = "Name of the cluster to which this broker belongs to"
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 335d5d2e010..abf8d28c171 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.websocket;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
+import java.io.Closeable;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -43,23 +51,12 @@ import 
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
 public abstract class AbstractWebSocketHandler extends WebSocketAdapter 
implements Closeable {
 
     protected final WebSocketService service;
@@ -167,18 +164,15 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
-        WebSocketProxyConfiguration webSocketProxyConfig = 
service.getWebSocketProxyConfig();
-        if (webSocketProxyConfig != null) {
-            int webSocketPingDurationSeconds = 
webSocketProxyConfig.getWebSocketPingDurationSeconds();
-            if (webSocketPingDurationSeconds > 0) {
-                pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
-                    try {
-                        
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
-                    } catch (IOException e) {
-                        log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
-                    }
-                }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
-            }
+        int webSocketPingDurationSeconds = 
service.getConfig().getWebSocketPingDurationSeconds();
+        if (webSocketPingDurationSeconds > 0) {
+            pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+                try {
+                    
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+                } catch (IOException e) {
+                    log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
+                }
+            }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
         }
         log.info("[{}] New WebSocket session on topic {}", 
session.getRemoteAddress(), topic);
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index fe0eb01125c..ad30d9dfabb 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -70,9 +70,6 @@ public class WebSocketService implements Closeable {
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
 
-    @Getter
-    private WebSocketProxyConfiguration webSocketProxyConfig;
-
     private ClusterData localCluster;
     private final ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
     private final ConcurrentOpenHashMap<String, 
ConcurrentOpenHashSet<ConsumerHandler>> topicConsumerMap;
@@ -81,7 +78,6 @@ public class WebSocketService implements Closeable {
 
     public WebSocketService(WebSocketProxyConfiguration config) {
         this(createClusterData(config), 
PulsarConfigurationLoader.convertFrom(config));
-        this.webSocketProxyConfig = config;
     }
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration 
config) {
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 782e05ea625..286e055fc18 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 import java.net.URLEncoder;
@@ -31,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
@@ -49,10 +51,13 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.mockito.Mock;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 public class AbstractWebSocketHandlerTest {
@@ -377,4 +382,54 @@ public class AbstractWebSocketHandlerTest {
         assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(), 
"dead-letter-topic");
         assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
     }
+
+    @Test
+    public void testPingFuture() {
+        WebSocketProxyConfiguration webSocketProxyConfiguration = new 
WebSocketProxyConfiguration();
+        webSocketProxyConfiguration.setWebSocketPingDurationSeconds(5);
+
+        WebSocketService webSocketService = new 
WebSocketService(webSocketProxyConfiguration);
+
+        HttpServletRequest httpServletRequest = mock(HttpServletRequest.class);
+        String consumerV2 = 
"/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+        Map<String, String[]> queryParams = new HashMap<String, String>(){{
+            put("ackTimeoutMillis", "1001");
+            put("subscriptionType", "Key_Shared");
+            put("subscriptionMode", "NonDurable");
+            put("receiverQueueSize", "999");
+            put("consumerName", "my-consumer");
+            put("priorityLevel", "1");
+            put("maxRedeliverCount", "5");
+        }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
entry -> new String[]{ entry.getValue() }));
+
+        when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+        when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+        MockedServletUpgradeResponse response = new 
MockedServletUpgradeResponse(null);
+        AbstractWebSocketHandler webSocketHandler = new 
WebSocketHandlerImpl(webSocketService, httpServletRequest, response);
+
+        Session session = mock(Session.class);
+        RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+        when(session.getRemote()).thenReturn(remoteEndpoint);
+
+        // onWebSocketClose
+        webSocketHandler.onWebSocketConnect(session);
+
+        ScheduledFuture<?> pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        
webSocketHandler.onWebSocketClose(HttpStatus.INTERNAL_SERVER_ERROR_500, 
"INTERNAL_SERVER_ERROR_500");
+        assertTrue(pingFuture.isDone());
+
+        // onWebSocketError
+        webSocketHandler.onWebSocketConnect(session);
+
+        pingFuture = webSocketHandler.getPingFuture();
+        assertNotNull(pingFuture);
+        assertFalse(pingFuture.isDone());
+
+        webSocketHandler.onWebSocketError(new 
RuntimeException("INTERNAL_SERVER_ERROR_500"));
+        assertTrue(pingFuture.isDone());
+    }
 }

Reply via email to