This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 17c05b3076d [fix][websocket][branch-2.9] Fix
webSocketPingDurationSeconds config (#19293)
17c05b3076d is described below
commit 17c05b3076d2c6bb0aa7a637062a04163d6cdcad
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 28 12:08:40 2023 +0800
[fix][websocket][branch-2.9] Fix webSocketPingDurationSeconds config
(#19293)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++
.../pulsar/proxy/server/ProxyConfiguration.java | 6 +++
.../pulsar/websocket/AbstractWebSocketHandler.java | 40 +++++++--------
.../apache/pulsar/websocket/WebSocketService.java | 4 --
.../websocket/AbstractWebSocketHandlerTest.java | 57 +++++++++++++++++++++-
5 files changed, 85 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b57b69e2030..f2155b4431c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2108,6 +2108,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int webSocketSessionIdleTimeoutMillis = 300000;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "The maximum size of a text message during parsing in WebSocket
proxy."
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d616bbd1ce4..e75db662a75 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -664,6 +664,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private boolean webSocketServiceEnabled = false;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Name of the cluster to which this broker belongs to"
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 335d5d2e010..abf8d28c171 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.websocket;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
+import java.io.Closeable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -43,23 +51,12 @@ import
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedExcepti
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
public abstract class AbstractWebSocketHandler extends WebSocketAdapter
implements Closeable {
protected final WebSocketService service;
@@ -167,18 +164,15 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
- WebSocketProxyConfiguration webSocketProxyConfig =
service.getWebSocketProxyConfig();
- if (webSocketProxyConfig != null) {
- int webSocketPingDurationSeconds =
webSocketProxyConfig.getWebSocketPingDurationSeconds();
- if (webSocketPingDurationSeconds > 0) {
- pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
- try {
-
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
- } catch (IOException e) {
- log.warn("[{}] WebSocket send ping",
getSession().getRemoteAddress(), e);
- }
- }, webSocketPingDurationSeconds, webSocketPingDurationSeconds,
TimeUnit.SECONDS);
- }
+ int webSocketPingDurationSeconds =
service.getConfig().getWebSocketPingDurationSeconds();
+ if (webSocketPingDurationSeconds > 0) {
+ pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+ try {
+
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ log.warn("[{}] WebSocket send ping",
getSession().getRemoteAddress(), e);
+ }
+ }, webSocketPingDurationSeconds, webSocketPingDurationSeconds,
TimeUnit.SECONDS);
}
log.info("[{}] New WebSocket session on topic {}",
session.getRemoteAddress(), topic);
}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index fe0eb01125c..ad30d9dfabb 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -70,9 +70,6 @@ public class WebSocketService implements Closeable {
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
- @Getter
- private WebSocketProxyConfiguration webSocketProxyConfig;
-
private ClusterData localCluster;
private final ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
private final ConcurrentOpenHashMap<String,
ConcurrentOpenHashSet<ConsumerHandler>> topicConsumerMap;
@@ -81,7 +78,6 @@ public class WebSocketService implements Closeable {
public WebSocketService(WebSocketProxyConfiguration config) {
this(createClusterData(config),
PulsarConfigurationLoader.convertFrom(config));
- this.webSocketProxyConfig = config;
}
public WebSocketService(ClusterData localCluster, ServiceConfiguration
config) {
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 782e05ea625..286e055fc18 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.net.URLEncoder;
@@ -31,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@@ -49,10 +51,13 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.mockito.Mock;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
public class AbstractWebSocketHandlerTest {
@@ -377,4 +382,54 @@ public class AbstractWebSocketHandlerTest {
assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(),
"dead-letter-topic");
assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
}
+
+ @Test
+ public void testPingFuture() {
+ WebSocketProxyConfiguration webSocketProxyConfiguration = new
WebSocketProxyConfiguration();
+ webSocketProxyConfiguration.setWebSocketPingDurationSeconds(5);
+
+ WebSocketService webSocketService = new
WebSocketService(webSocketProxyConfiguration);
+
+ HttpServletRequest httpServletRequest = mock(HttpServletRequest.class);
+ String consumerV2 =
"/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+ Map<String, String[]> queryParams = new HashMap<String, String>(){{
+ put("ackTimeoutMillis", "1001");
+ put("subscriptionType", "Key_Shared");
+ put("subscriptionMode", "NonDurable");
+ put("receiverQueueSize", "999");
+ put("consumerName", "my-consumer");
+ put("priorityLevel", "1");
+ put("maxRedeliverCount", "5");
+ }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> new String[]{ entry.getValue() }));
+
+ when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+ when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+ MockedServletUpgradeResponse response = new
MockedServletUpgradeResponse(null);
+ AbstractWebSocketHandler webSocketHandler = new
WebSocketHandlerImpl(webSocketService, httpServletRequest, response);
+
+ Session session = mock(Session.class);
+ RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ when(session.getRemote()).thenReturn(remoteEndpoint);
+
+ // onWebSocketClose
+ webSocketHandler.onWebSocketConnect(session);
+
+ ScheduledFuture<?> pingFuture = webSocketHandler.getPingFuture();
+ assertNotNull(pingFuture);
+ assertFalse(pingFuture.isDone());
+
+
webSocketHandler.onWebSocketClose(HttpStatus.INTERNAL_SERVER_ERROR_500,
"INTERNAL_SERVER_ERROR_500");
+ assertTrue(pingFuture.isDone());
+
+ // onWebSocketError
+ webSocketHandler.onWebSocketConnect(session);
+
+ pingFuture = webSocketHandler.getPingFuture();
+ assertNotNull(pingFuture);
+ assertFalse(pingFuture.isDone());
+
+ webSocketHandler.onWebSocketError(new
RuntimeException("INTERNAL_SERVER_ERROR_500"));
+ assertTrue(pingFuture.isDone());
+ }
}