This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 6c611d5c2aa [improve][websocket][branch-2.11] Add ping support (#19255)
6c611d5c2aa is described below
commit 6c611d5c2aa130b36e7c2fda4567604f9eff016f
Author: Zixuan Liu <[email protected]>
AuthorDate: Wed Jan 18 13:27:51 2023 +0800
[improve][websocket][branch-2.11] Add ping support (#19255)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++
.../websocket/proxy/ProxyIdleTimeoutTest.java | 105 ++++++++++++++++++++
.../pulsar/websocket/proxy/ProxyPingTest.java | 107 +++++++++++++++++++++
.../pulsar/proxy/server/ProxyConfiguration.java | 6 ++
.../pulsar/websocket/AbstractWebSocketHandler.java | 30 ++++++
.../service/WebSocketProxyConfiguration.java | 3 +
.../websocket/AbstractWebSocketHandlerTest.java | 56 +++++++++++
7 files changed, 313 insertions(+)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5942ee9a8e9..7e9534bc9bc 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2519,6 +2519,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private int webSocketSessionIdleTimeoutMillis = 300000;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "The maximum size of a text message during parsing in WebSocket
proxy."
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
new file mode 100644
index 00000000000..71f8c0b6d86
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
+ protected String methodName;
+ private ProxyServer proxyServer;
+ private WebSocketService service;
+
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+ config.setWebServicePort(Optional.of(0));
+ config.setClusterName("test");
+ config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+ config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+ service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
+ doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ proxyServer = new ProxyServer(config);
+ WebSocketServiceStarter.start(proxyServer, service);
+ log.info("Proxy Server Started");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (service != null) {
+ service.close();
+ }
+ if (proxyServer != null) {
+ proxyServer.stop();
+ }
+ log.info("Finished Cleaning Up Test setup");
+ }
+
+ @Test
+ public void testIdleTimeout() throws Exception {
+ String producerUri = "ws://localhost:" +
proxyServer.getListenPortHTTP().get() +
+ "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+ URI produceUri = URI.create(producerUri);
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture =
produceClient.connect(produceSocket, produceUri, produceRequest);
+ assertThat(producerFuture).succeedsWithin(2, SECONDS);
+ Session session = producerFuture.get();
+ Awaitility.await().during(5, SECONDS).untilAsserted(() ->
assertThat(session.isOpen()).isFalse());
+ } finally {
+ produceClient.stop();
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
new file mode 100644
index 00000000000..6e0dfa46cfd
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import java.net.URI;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.awaitility.Awaitility;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "websocket")
+@Slf4j
+public class ProxyPingTest extends ProducerConsumerBase {
+ protected String methodName;
+
+ private ProxyServer proxyServer;
+ private WebSocketService service;
+
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+ config.setWebServicePort(Optional.of(0));
+ config.setClusterName("test");
+ config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
+ config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
+ config.setWebSocketPingDurationSeconds(2);
+ service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
+ doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ proxyServer = new ProxyServer(config);
+ WebSocketServiceStarter.start(proxyServer, service);
+ log.info("Proxy Server Started");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (service != null) {
+ service.close();
+ }
+ if (proxyServer != null) {
+ proxyServer.stop();
+ }
+ log.info("Finished Cleaning Up Test setup");
+ }
+
+ @Test
+ public void testPing() throws Exception {
+ String producerUri = "ws://localhost:" +
proxyServer.getListenPortHTTP().get() +
+ "/ws/v2/producer/persistent/my-property/my-ns/my-topic1/";
+
+ URI produceUri = URI.create(producerUri);
+ WebSocketClient produceClient = new WebSocketClient();
+ SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+ try {
+ produceClient.start();
+ ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+ Future<Session> producerFuture =
produceClient.connect(produceSocket, produceUri, produceRequest);
+ assertThat(producerFuture).succeedsWithin(2, SECONDS);
+ Session session = producerFuture.get();
+ Awaitility.await().during(5, SECONDS).untilAsserted(() ->
assertThat(session.isOpen()).isTrue());
+ } finally {
+ produceClient.stop();
+ }
+ }
+}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 2555d648000..dc6e04caaf1 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -783,6 +783,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private boolean webSocketServiceEnabled = false;
+ @FieldContext(
+ category = CATEGORY_WEBSOCKET,
+ doc = "Interval of time to sending the ping to keep alive in
WebSocket proxy. "
+ + "This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Name of the cluster to which this broker belongs to"
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 1bd0055bd1d..8ac4c2552ec 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -19,12 +19,17 @@
package org.apache.pulsar.websocket;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import java.io.Closeable;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
@@ -62,6 +67,8 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
protected final Map<String, String> queryParams;
private static final String PULSAR_AUTH_METHOD_NAME =
"X-Pulsar-Auth-Method-Name";
+ private ScheduledFuture<?> pingFuture;
+
public AbstractWebSocketHandler(WebSocketService service,
HttpServletRequest request,
ServletUpgradeResponse response) {
@@ -169,9 +176,25 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
}
}
+ private void closePingFuture() {
+ if (pingFuture != null && !pingFuture.isDone()) {
+ pingFuture.cancel(true);
+ }
+ }
+
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
+ int webSocketPingDurationSeconds =
service.getConfig().getWebSocketPingDurationSeconds();
+ if (webSocketPingDurationSeconds > 0) {
+ pingFuture = service.getExecutor().scheduleAtFixedRate(() -> {
+ try {
+
session.getRemote().sendPing(ByteBuffer.wrap("PING".getBytes(StandardCharsets.UTF_8)));
+ } catch (IOException e) {
+ log.warn("[{}] WebSocket send ping",
getSession().getRemoteAddress(), e);
+ }
+ }, webSocketPingDurationSeconds, webSocketPingDurationSeconds,
TimeUnit.SECONDS);
+ }
log.info("[{}] New WebSocket session on topic {}",
session.getRemoteAddress(), topic);
}
@@ -180,6 +203,7 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
super.onWebSocketError(cause);
log.info("[{}] WebSocket error on topic {} : {}",
getSession().getRemoteAddress(), topic, cause.getMessage());
try {
+ closePingFuture();
close();
} catch (IOException e) {
log.error("Failed in closing WebSocket session for topic {} with
error: {}", topic, e.getMessage());
@@ -191,6 +215,7 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
log.info("[{}] Closed WebSocket session on topic {}. status: {} -
reason: {}", getSession().getRemoteAddress(),
topic, statusCode, reason);
try {
+ closePingFuture();
close();
} catch (IOException e) {
log.warn("[{}] Failed to close handler for topic {}. ",
getSession().getRemoteAddress(), topic, e);
@@ -259,6 +284,11 @@ public abstract class AbstractWebSocketHandler extends
WebSocketAdapter implemen
return TopicName.get(domain, namespace, name);
}
+ @VisibleForTesting
+ public ScheduledFuture<?> getPingFuture() {
+ return pingFuture;
+ }
+
protected abstract Boolean isAuthorized(String authRole,
AuthenticationDataSource
authenticationData) throws Exception;
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 8fb3ab7ed01..a69c61ae689 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -161,6 +161,9 @@ public class WebSocketProxyConfiguration implements
PulsarConfiguration {
@FieldContext(doc = "Timeout of idling WebSocket session (in
milliseconds)")
private int webSocketSessionIdleTimeoutMillis = 300000;
+ @FieldContext(doc = "Interval of time to sending the ping to keep alive.
This value greater than 0 means enabled")
+ private int webSocketPingDurationSeconds = -1;
+
@FieldContext(doc = "When this parameter is not empty, unauthenticated
users perform as anonymousUserRole")
private String anonymousUserRole = null;
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
index 29d8d64159c..cb8a499120d 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.net.URLEncoder;
@@ -31,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@@ -49,6 +51,10 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.mockito.Mock;
import org.testng.annotations.AfterClass;
@@ -380,4 +386,54 @@ public class AbstractWebSocketHandlerTest {
assertEquals(conf.getDeadLetterPolicy().getDeadLetterTopic(),
"dead-letter-topic");
assertEquals(conf.getDeadLetterPolicy().getMaxRedeliverCount(), 3);
}
+
+ @Test
+ public void testPingFuture() {
+ WebSocketProxyConfiguration webSocketProxyConfiguration = new
WebSocketProxyConfiguration();
+ webSocketProxyConfiguration.setWebSocketPingDurationSeconds(5);
+
+ WebSocketService webSocketService = new
WebSocketService(webSocketProxyConfiguration);
+
+ HttpServletRequest httpServletRequest = mock(HttpServletRequest.class);
+ String consumerV2 =
"/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription";
+ Map<String, String[]> queryParams = new HashMap<String, String>(){{
+ put("ackTimeoutMillis", "1001");
+ put("subscriptionType", "Key_Shared");
+ put("subscriptionMode", "NonDurable");
+ put("receiverQueueSize", "999");
+ put("consumerName", "my-consumer");
+ put("priorityLevel", "1");
+ put("maxRedeliverCount", "5");
+ }}.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> new String[]{ entry.getValue() }));
+
+ when(httpServletRequest.getRequestURI()).thenReturn(consumerV2);
+ when(httpServletRequest.getParameterMap()).thenReturn(queryParams);
+
+ MockedServletUpgradeResponse response = new
MockedServletUpgradeResponse(null);
+ AbstractWebSocketHandler webSocketHandler = new
WebSocketHandlerImpl(webSocketService, httpServletRequest, response);
+
+ Session session = mock(Session.class);
+ RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
+ when(session.getRemote()).thenReturn(remoteEndpoint);
+
+ // onWebSocketClose
+ webSocketHandler.onWebSocketConnect(session);
+
+ ScheduledFuture<?> pingFuture = webSocketHandler.getPingFuture();
+ assertNotNull(pingFuture);
+ assertFalse(pingFuture.isDone());
+
+
webSocketHandler.onWebSocketClose(HttpStatus.INTERNAL_SERVER_ERROR_500,
"INTERNAL_SERVER_ERROR_500");
+ assertTrue(pingFuture.isDone());
+
+ // onWebSocketError
+ webSocketHandler.onWebSocketConnect(session);
+
+ pingFuture = webSocketHandler.getPingFuture();
+ assertNotNull(pingFuture);
+ assertFalse(pingFuture.isDone());
+
+ webSocketHandler.onWebSocketError(new
RuntimeException("INTERNAL_SERVER_ERROR_500"));
+ assertTrue(pingFuture.isDone());
+ }
}