This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 516437e370a [fix][websocket] Fix webSocketPingDurationSeconds config 
(#19256)
516437e370a is described below

commit 516437e370a711d48fe1d444a0c47e64e7cf2f4b
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Jan 20 10:16:40 2023 +0800

    [fix][websocket] Fix webSocketPingDurationSeconds config (#19256)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++++++
 .../pulsar/proxy/server/ProxyConfiguration.java    |  6 ++++++
 .../pulsar/websocket/AbstractWebSocketHandler.java | 22 +++++++++-------------
 .../apache/pulsar/websocket/WebSocketService.java  |  4 ----
 4 files changed, 21 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2789860b3a2..4a9a6e47bf3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2613,6 +2613,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int webSocketSessionIdleTimeoutMillis = 300000;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
         category = CATEGORY_WEBSOCKET,
         doc = "The maximum size of a text message during parsing in WebSocket 
proxy."
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index fcee04435fe..a91b6e70f5b 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -811,6 +811,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private boolean webSocketServiceEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_WEBSOCKET,
+            doc = "Interval of time to sending the ping to keep alive in 
WebSocket proxy. "
+                    + "This value greater than 0 means enabled")
+    private int webSocketPingDurationSeconds = -1;
+
     @FieldContext(
             category = CATEGORY_WEBSOCKET,
             doc = "Name of the cluster to which this broker belongs to"
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index e19f1557b15..3eb0a0dfcf8 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerCommand;
-import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -192,18 +191,15 @@ public abstract class AbstractWebSocketHandler extends 
WebSocketAdapter implemen
     @Override
     public void onWebSocketConnect(Session session) {
         super.onWebSocketConnect(session);
-        WebSocketProxyConfiguration webSocketProxyConfig = 
service.getWebSocketProxyConfig();
-        if (webSocketProxyConfig != null) {
-            int webSocketPingDurationSeconds = 
webSocketProxyConfig.getWebSocketPingDurationSeconds();
-            if (webSocketPingDurationSeconds > 0) {
-                pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
-                    try {
-                        
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
-                    } catch (IOException e) {
-                        log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
-                    }
-                }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
-            }
+        int webSocketPingDurationSeconds = 
service.getConfig().getWebSocketPingDurationSeconds();
+        if (webSocketPingDurationSeconds > 0) {
+            pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+                try {
+                    
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+                } catch (IOException e) {
+                    log.warn("[{}] WebSocket send ping", 
getSession().getRemoteAddress(), e);
+                }
+            }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, 
TimeUnit.SECONDS);
         }
         log.info("[{}] New WebSocket session on topic {}", 
session.getRemoteAddress(), topic);
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index a0fe8099d01..9a8653029ce 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -69,9 +69,6 @@ public class WebSocketService implements Closeable {
     private MetadataStoreExtended configMetadataStore;
     private ServiceConfiguration config;
 
-    @Getter
-    private WebSocketProxyConfiguration webSocketProxyConfig;
-
     @Getter
     private Optional<CryptoKeyReader> cryptoKeyReader = Optional.empty();
 
@@ -83,7 +80,6 @@ public class WebSocketService implements Closeable {
 
     public WebSocketService(WebSocketProxyConfiguration config) {
         this(createClusterData(config), 
PulsarConfigurationLoader.convertFrom(config));
-        this.webSocketProxyConfig = config;
     }
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration 
config) {

Reply via email to