This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 894192fb654 [fix][ws] Remove unnecessary ping/pong implementation
(#20733)
894192fb654 is described below
commit 894192fb6542e504be43034a3c33e90f9c6e528a
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Jul 6 08:04:29 2023 -0500
[fix][ws] Remove unnecessary ping/pong implementation (#20733)
(cherry picked from commit 33044f0e0f38374256d5b89edff6a23d08f4759d)
---
.../org/apache/pulsar/broker/PulsarService.java | 7 ---
.../pulsar/proxy/server/ProxyServiceStarter.java | 7 ---
.../proxy/server/ProxyServiceStarterTest.java | 12 ----
.../apache/pulsar/websocket/PingPongHandler.java | 50 -----------------
.../pulsar/websocket/WebSocketPingPongServlet.java | 44 ---------------
.../websocket/service/WebSocketServiceStarter.java | 4 --
...ngHandlerTest.java => PingPongSupportTest.java} | 64 ++++++++++++++++++++--
7 files changed, 58 insertions(+), 130 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 62d4634fa2d..cf8ab33fd7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -173,7 +173,6 @@ import
org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
@@ -1072,12 +1071,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
new ServletHolder(readerWebSocketServlet), true,
attributeMap);
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true,
attributeMap);
-
- final WebSocketServlet pingPongWebSocketServlet = new
WebSocketPingPongServlet(webSocketService);
- webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
- new ServletHolder(pingPongWebSocketServlet), true,
attributeMap);
- webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
- new ServletHolder(pingPongWebSocketServlet), true,
attributeMap);
}
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index beee9f1a4f7..46a5efef668 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
@@ -315,12 +314,6 @@ public class ProxyServiceStarter {
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));
-
- final WebSocketServlet pingPongWebSocketServlet = new
WebSocketPingPongServlet(webSocketService);
- server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
- new ServletHolder(pingPongWebSocketServlet));
- server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
- new ServletHolder(pingPongWebSocketServlet));
}
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 4dcfc170964..def58be6df3 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -76,18 +76,6 @@ public class ProxyServiceStarterTest extends
MockedPulsarServiceBaseTest {
return String.format("ws://localhost:%d/ws",
serviceStarter.getServer().getListenPortHTTP().get());
}
- @Test
- public void testEnableWebSocketServer() throws Exception {
- HttpClient httpClient = new HttpClient();
- WebSocketClient webSocketClient = new WebSocketClient(httpClient);
- webSocketClient.start();
- MyWebSocket myWebSocket = new MyWebSocket();
- String webSocketUri = computeWsBasePath() + "/pingpong";
- Future<Session> sessionFuture = webSocketClient.connect(myWebSocket,
URI.create(webSocketUri));
- System.out.println("uri" + webSocketUri);
-
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
- assertTrue(myWebSocket.getResponse().contains("ping"));
- }
@Test
public void testProducer() throws Exception {
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
deleted file mode 100644
index 870630abc88..00000000000
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.websocket;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
-import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PingPongHandler extends WebSocketAdapter implements
WebSocketPingPongListener {
-
- private static final Logger log =
LoggerFactory.getLogger(PingPongHandler.class);
-
- @Override
- public void onWebSocketPing(ByteBuffer payload) {
- try {
- if (log.isDebugEnabled()) {
- log.debug("PING: {}", BufferUtil.toDetailString(payload));
- }
- getRemote().sendPong(payload);
- } catch (IOException e) {
- log.warn("Failed to send pong: {}", e.getMessage());
- }
- }
-
- @Override
- public void onWebSocketPong(ByteBuffer payload) {
-
- }
-
-}
\ No newline at end of file
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
deleted file mode 100644
index cc2d79ee541..00000000000
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.websocket;
-
-import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
-import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
-
-public class WebSocketPingPongServlet extends WebSocketServlet {
- private static final long serialVersionUID = 1L;
-
- public static final String SERVLET_PATH = "/ws/pingpong";
- public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong";
-
- private final transient WebSocketService service;
-
- public WebSocketPingPongServlet(WebSocketService service) {
- this.service = service;
- }
-
- @Override
- public void configure(WebSocketServletFactory factory) {
-
factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize());
- if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
-
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
- }
- factory.setCreator((request, response) -> new PingPongHandler());
- }
-}
\ No newline at end of file
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
index fbcecc0642e..6231ef1a2aa 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
@@ -91,7 +90,6 @@ public class WebSocketServiceStarter {
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH,
new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH,
new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH,
new WebSocketReaderServlet(service));
- proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH,
new WebSocketPingPongServlet(service));
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new WebSocketProducerServlet(service));
@@ -99,8 +97,6 @@ public class WebSocketServiceStarter {
new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new WebSocketReaderServlet(service));
-
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
- new WebSocketPingPongServlet(service));
proxyServer.addRestResource(ADMIN_PATH_V1,
ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
proxyServer.addRestResource(ADMIN_PATH_V2,
ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
similarity index 67%
rename from
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
rename to
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
index 662009f1aab..8119c2f1f81 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
@@ -18,13 +18,16 @@
*/
package org.apache.pulsar.websocket;
+import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
+import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
@@ -40,11 +43,18 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class PingPongHandlerTest {
+/**
+ * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support
+ */
+public class PingPongSupportTest {
private static Server server;
@@ -67,9 +77,9 @@ public class PingPongHandlerTest {
when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);
- ServletHolder servletHolder = new ServletHolder("ws-events", new
WebSocketPingPongServlet(service));
+ ServletHolder servletHolder = new ServletHolder("ws-events", new
GenericWebSocketServlet(service));
ServletContextHandler context = new
ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
+ context.setContextPath("/ws");
context.addServlet(servletHolder, "/*");
server.setHandler(context);
try {
@@ -87,18 +97,60 @@ public class PingPongHandlerTest {
executor.stop();
}
- @Test
- public void testPingPong() throws Exception {
+ /**
+ * We test these different endpoints because they are parsed in the
AbstractWebSocketHandler. Technically, we are
+ * not testing these implementations, but the ping/pong support is
guaranteed as part of the framework.
+ */
+ @DataProvider(name = "endpoint")
+ public static Object[][] cacheEnable() {
+ return new Object[][] { { "producer" }, { "consumer" }, { "reader" } };
+ }
+
+ @Test(dataProvider = "endpoint")
+ public void testPingPong(String endpoint) throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
- String webSocketUri = "ws://localhost:8080/ws/pingpong";
+ String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint +
"/persistent/my-property/my-ns/my-topic";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket,
URI.create(webSocketUri));
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
assertTrue(myWebSocket.getResponse().contains("test"));
}
+ public static class GenericWebSocketHandler extends
AbstractWebSocketHandler {
+
+ public GenericWebSocketHandler(WebSocketService service,
HttpServletRequest request, ServletUpgradeResponse response) {
+ super(service, request, response);
+ }
+
+ @Override
+ protected Boolean isAuthorized(String authRole,
AuthenticationDataSource authenticationData) throws Exception {
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+
+ public static class GenericWebSocketServlet extends WebSocketServlet {
+
+ private static final long serialVersionUID = 1L;
+ private final WebSocketService service;
+
+ public GenericWebSocketServlet(WebSocketService service) {
+ this.service = service;
+ }
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.setCreator((request, response) ->
+ new GenericWebSocketHandler(service,
request.getHttpServletRequest(), response));
+ }
+ }
+
@WebSocket
public static class MyWebSocket extends WebSocketAdapter implements
WebSocketPingPongListener {