This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new eac263e8f2a [fix][ws] Remove unnecessary ping/pong implementation  
(#20733)
eac263e8f2a is described below

commit eac263e8f2a93d3b9f707b97c7bbcbc2a826569f
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Jul 6 08:04:29 2023 -0500

    [fix][ws] Remove unnecessary ping/pong implementation  (#20733)
    
    (cherry picked from commit 33044f0e0f38374256d5b89edff6a23d08f4759d)
    (cherry picked from commit 11ee36d0351644a006d2a8639bdcc714fb602358)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  7 ---
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  7 ---
 .../proxy/server/ProxyServiceStarterTest.java      | 12 ----
 .../apache/pulsar/websocket/PingPongHandler.java   | 50 -----------------
 .../pulsar/websocket/WebSocketPingPongServlet.java | 44 ---------------
 .../websocket/service/WebSocketServiceStarter.java |  4 --
 ...ngHandlerTest.java => PingPongSupportTest.java} | 64 ++++++++++++++++++++--
 7 files changed, 58 insertions(+), 130 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3bead53f106..7dd28df8a64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -167,7 +167,6 @@ import 
org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
@@ -968,12 +967,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                     new ServletHolder(readerWebSocketServlet), true, 
attributeMap);
             webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet), true, 
attributeMap);
-
-            final WebSocketServlet pingPongWebSocketServlet = new 
WebSocketPingPongServlet(webSocketService);
-            webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
-                    new ServletHolder(pingPongWebSocketServlet), true, 
attributeMap);
-            webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
-                    new ServletHolder(pingPongWebSocketServlet), true, 
attributeMap);
         }
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 2959f0015da..d042323c74e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.apache.pulsar.proxy.stats.ProxyStats;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
@@ -303,12 +302,6 @@ public class ProxyServiceStarter {
                     new ServletHolder(readerWebSocketServlet));
             server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                     new ServletHolder(readerWebSocketServlet));
-
-            final WebSocketServlet pingPongWebSocketServlet = new 
WebSocketPingPongServlet(webSocketService);
-            server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
-                    new ServletHolder(pingPongWebSocketServlet));
-            server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
-                    new ServletHolder(pingPongWebSocketServlet));
         }
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 62b65d32e8c..43b06d6311f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -79,18 +79,6 @@ public class ProxyServiceStarterTest extends 
MockedPulsarServiceBaseTest {
         return String.format("ws://localhost:%d/ws", 
serviceStarter.getServer().getListenPortHTTP().get());
     }
 
-    @Test
-    public void testEnableWebSocketServer() throws Exception {
-        HttpClient httpClient = new HttpClient();
-        WebSocketClient webSocketClient = new WebSocketClient(httpClient);
-        webSocketClient.start();
-        MyWebSocket myWebSocket = new MyWebSocket();
-        String webSocketUri = computeWsBasePath() + "/pingpong";
-        Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, 
URI.create(webSocketUri));
-        System.out.println("uri" + webSocketUri);
-        
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
-        assertTrue(myWebSocket.getResponse().contains("ping"));
-    }
 
     @Test
     public void testProducer() throws Exception {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
deleted file mode 100644
index 1431241cc8a..00000000000
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.websocket;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.eclipse.jetty.util.BufferUtil;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
-import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PingPongHandler extends WebSocketAdapter implements 
WebSocketPingPongListener {
-
-    private static final Logger log = 
LoggerFactory.getLogger(PingPongHandler.class);
-
-    @Override
-    public void onWebSocketPing(ByteBuffer payload) {
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug("PING: {}", BufferUtil.toDetailString(payload));
-            }
-            getRemote().sendPong(payload);
-        } catch (IOException e) {
-            log.warn("Failed to send pong: {}", e.getMessage());
-        }
-    }
-
-    @Override
-    public void onWebSocketPong(ByteBuffer payload) {
-
-    }
-
-}
\ No newline at end of file
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
deleted file mode 100644
index ca7644b8540..00000000000
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.websocket;
-
-import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
-import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
-
-public class WebSocketPingPongServlet extends WebSocketServlet {
-    private static final long serialVersionUID = 1L;
-
-    public static final String SERVLET_PATH = "/ws/pingpong";
-    public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong";
-
-    private final transient WebSocketService service;
-
-    public WebSocketPingPongServlet(WebSocketService service) {
-        this.service = service;
-    }
-
-    @Override
-    public void configure(WebSocketServletFactory factory) {
-        
factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize());
-        if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
-            
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
-        }
-        factory.setCreator((request, response) -> new PingPongHandler());
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
index 63aff925ddc..00f1afcdd30 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java
@@ -28,7 +28,6 @@ import 
org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
-import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
@@ -90,7 +89,6 @@ public class WebSocketServiceStarter {
         proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, 
new WebSocketProducerServlet(service));
         proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, 
new WebSocketConsumerServlet(service));
         proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, 
new WebSocketReaderServlet(service));
-        proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, 
new WebSocketPingPongServlet(service));
 
         
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
                 new WebSocketProducerServlet(service));
@@ -98,8 +96,6 @@ public class WebSocketServiceStarter {
                 new WebSocketConsumerServlet(service));
         proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
                 new WebSocketReaderServlet(service));
-        
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
-                new WebSocketPingPongServlet(service));
 
         proxyServer.addRestResources(ADMIN_PATH_V1, 
WebSocketProxyStatsV1.class.getPackage().getName(),
                 ATTRIBUTE_PROXY_SERVICE_NAME, service);
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
similarity index 67%
rename from 
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
rename to 
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
index 7d5cc800aa3..e4002394aee 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pulsar.websocket;
 
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Future;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.server.Server;
@@ -40,11 +43,18 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertTrue;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-public class PingPongHandlerTest {
+/**
+ * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support
+ */
+public class PingPongSupportTest {
 
     private static Server server;
 
@@ -67,9 +77,9 @@ public class PingPongHandlerTest {
         when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
         when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);
 
-        ServletHolder servletHolder = new ServletHolder("ws-events", new 
WebSocketPingPongServlet(service));
+        ServletHolder servletHolder = new ServletHolder("ws-events", new 
GenericWebSocketServlet(service));
         ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
+        context.setContextPath("/ws");
         context.addServlet(servletHolder, "/*");
         server.setHandler(context);
         try {
@@ -87,18 +97,60 @@ public class PingPongHandlerTest {
         executor.stop();
     }
 
-    @Test
-    public void testPingPong() throws Exception {
+    /**
+     * We test these different endpoints because they are parsed in the 
AbstractWebSocketHandler. Technically, we are
+     * not testing these implementations, but the ping/pong support is 
guaranteed as part of the framework.
+     */
+    @DataProvider(name = "endpoint")
+    public static Object[][] cacheEnable() {
+        return new Object[][] { { "producer" }, { "consumer" }, { "reader" } };
+    }
+
+    @Test(dataProvider = "endpoint")
+    public void testPingPong(String endpoint) throws Exception {
         HttpClient httpClient = new HttpClient();
         WebSocketClient webSocketClient = new WebSocketClient(httpClient);
         webSocketClient.start();
         MyWebSocket myWebSocket = new MyWebSocket();
-        String webSocketUri = "ws://localhost:8080/ws/pingpong";
+        String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + 
"/persistent/my-property/my-ns/my-topic";
         Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, 
URI.create(webSocketUri));
         
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
         assertTrue(myWebSocket.getResponse().contains("test"));
     }
 
+    public static class GenericWebSocketHandler extends 
AbstractWebSocketHandler {
+
+        public GenericWebSocketHandler(WebSocketService service, 
HttpServletRequest request, ServletUpgradeResponse response) {
+            super(service, request, response);
+        }
+
+        @Override
+        protected Boolean isAuthorized(String authRole, 
AuthenticationDataSource authenticationData) throws Exception {
+            return true;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+
+    public static class GenericWebSocketServlet extends WebSocketServlet {
+
+        private static final long serialVersionUID = 1L;
+        private final WebSocketService service;
+
+        public GenericWebSocketServlet(WebSocketService service) {
+            this.service = service;
+        }
+
+        @Override
+        public void configure(WebSocketServletFactory factory) {
+            factory.setCreator((request, response) ->
+                    new GenericWebSocketHandler(service, 
request.getHttpServletRequest(), response));
+        }
+    }
+
     @WebSocket
     public static class MyWebSocket extends WebSocketAdapter implements 
WebSocketPingPongListener {
 

Reply via email to