This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 341ac715aa7a9184988064c6089d89a35df5c07f Author: Tboy <[email protected]> AuthorDate: Wed Jul 7 20:32:53 2021 +0800 1. Fix websocket tls bug. (#11243) 2. Add ProxyServiceTlsStarterTest. 3. Refactor ProxyServiceStarter to be more easy to test. (cherry picked from commit 887d74f902211e39a38257b9f3099d2acd777418) --- .../pulsar/proxy/server/ProxyServiceStarter.java | 33 ++++++++++++----- .../proxy/server/ProxyServiceStarterTest.java | 8 +++-- ...erTest.java => ProxyServiceTlsStarterTest.java} | 41 ++++++++++++---------- 3 files changed, 53 insertions(+), 29 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index c34f8cc..300091d 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -28,6 +28,7 @@ import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger; import com.google.common.annotations.VisibleForTesting; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; @@ -86,6 +87,10 @@ public class ProxyServiceStarter { private ProxyConfiguration config; + private ProxyService proxyService; + + private WebServer server; + public ProxyServiceStarter(String[] args) throws Exception { try { @@ -155,17 +160,12 @@ public class ProxyServiceStarter { AuthenticationService authenticationService = new AuthenticationService( PulsarConfigurationLoader.convertFrom(config)); // create proxy service - ProxyService proxyService = new ProxyService(config, authenticationService); + proxyService = new ProxyService(config, authenticationService); // create a web-service - final WebServer server = new WebServer(config, authenticationService); + server = new WebServer(config, authenticationService); Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - proxyService.close(); - server.stop(); - } catch (Exception e) { - log.warn("server couldn't stop gracefully {}", e.getMessage(), e); - } + close(); })); proxyService.start(); @@ -194,6 +194,19 @@ public class ProxyServiceStarter { server.start(); } + public void close() { + try { + if(proxyService != null) { + proxyService.close(); + } + if(server != null) { + server.stop(); + } + } catch (Exception e) { + log.warn("server couldn't stop gracefully {}", e.getMessage(), e); + } + } + public static void addWebServerHandlers(WebServer server, ProxyConfiguration config, ProxyService service, @@ -232,7 +245,9 @@ public class ProxyServiceStarter { if (config.isWebSocketServiceEnabled()) { // add WebSocket servlet // Use local broker address to avoid different IP address when using a VIP for service discovery - WebSocketService webSocketService = new WebSocketService(createClusterData(config), PulsarConfigurationLoader.convertFrom(config)); + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config); + serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker()); + WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration); webSocketService.start(); final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService); server.addServlet(WebSocketProducerServlet.SERVLET_PATH, diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 63d7e39..3377ec2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -48,12 +48,15 @@ import static org.testng.Assert.assertTrue; public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { + static final String[] ARGS = new String[]{"-c", "./src/test/resources/proxy.conf"}; + + private ProxyServiceStarter serviceStarter; + @Override @BeforeClass protected void setup() throws Exception { internalSetup(); - String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"}; - ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); + serviceStarter = new ProxyServiceStarter(ARGS); serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); serviceStarter.getConfig().setServicePort(Optional.of(11000)); @@ -65,6 +68,7 @@ public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { internalCleanup(); + serviceStarter.close(); } @Test diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java similarity index 79% copy from pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java copy to pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java index 63d7e39..7e6c0f5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java @@ -20,7 +20,6 @@ package org.apache.pulsar.proxy.server; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; - import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -43,46 +42,52 @@ import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.ARGS; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest { +public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest { + + private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem"; + private final String TLS_PROXY_CERT_FILE_PATH = "./src/test/resources/authentication/tls/server-cert.pem"; + private final String TLS_PROXY_KEY_FILE_PATH = "./src/test/resources/authentication/tls/server-key.pem"; + private ProxyServiceStarter serviceStarter; @Override @BeforeClass protected void setup() throws Exception { internalSetup(); - String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"}; - ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args); + serviceStarter = new ProxyServiceStarter(ARGS); serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls()); serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); - serviceStarter.getConfig().setServicePort(Optional.of(11000)); + serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); + serviceStarter.getConfig().setServicePortTls(Optional.of(11043)); + serviceStarter.getConfig().setTlsEnabledWithBroker(true); serviceStarter.getConfig().setWebSocketServiceEnabled(true); + serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); + serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); serviceStarter.start(); } + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); + this.conf.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); + } + @Override @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { internalCleanup(); - } - - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; - Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); + serviceStarter.close(); } @Test public void testProducer() throws Exception { @Cleanup - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000") + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043") + .allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) .build(); @Cleanup
