This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 4c4b923 1. Fix websocket tls bug. (#11243)
4c4b923 is described below
commit 4c4b9239f3e7af18e59998ca273cc573e43017c2
Author: Tboy <[email protected]>
AuthorDate: Wed Jul 7 20:32:53 2021 +0800
1. Fix websocket tls bug. (#11243)
2. Add ProxyServiceTlsStarterTest.
3. Refactor ProxyServiceStarter to be more easy to test.
(cherry picked from commit 887d74f902211e39a38257b9f3099d2acd777418)
---
.../pulsar/proxy/server/ProxyServiceStarter.java | 33 ++++++++++++-----
.../proxy/server/ProxyServiceStarterTest.java | 8 +++--
...erTest.java => ProxyServiceTlsStarterTest.java} | 41 ++++++++++++----------
3 files changed, 53 insertions(+), 29 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 8f868dc..6c151dd 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -28,6 +28,7 @@ import static
org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
@@ -87,6 +88,10 @@ public class ProxyServiceStarter {
private ProxyConfiguration config;
+ private ProxyService proxyService;
+
+ private WebServer server;
+
public ProxyServiceStarter(String[] args) throws Exception {
try {
@@ -156,17 +161,12 @@ public class ProxyServiceStarter {
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
// create proxy service
- ProxyService proxyService = new ProxyService(config,
authenticationService);
+ proxyService = new ProxyService(config, authenticationService);
// create a web-service
- final WebServer server = new WebServer(config, authenticationService);
+ server = new WebServer(config, authenticationService);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- proxyService.close();
- server.stop();
- } catch (Exception e) {
- log.warn("server couldn't stop gracefully {}", e.getMessage(),
e);
- }
+ close();
}));
proxyService.start();
@@ -195,6 +195,19 @@ public class ProxyServiceStarter {
server.start();
}
+ public void close() {
+ try {
+ if(proxyService != null) {
+ proxyService.close();
+ }
+ if(server != null) {
+ server.stop();
+ }
+ } catch (Exception e) {
+ log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
+ }
+ }
+
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
@@ -233,7 +246,9 @@ public class ProxyServiceStarter {
if (config.isWebSocketServiceEnabled()) {
// add WebSocket servlet
// Use local broker address to avoid different IP address when
using a VIP for service discovery
- WebSocketService webSocketService = new
WebSocketService(createClusterData(config),
PulsarConfigurationLoader.convertFrom(config));
+ ServiceConfiguration serviceConfiguration =
PulsarConfigurationLoader.convertFrom(config);
+
serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
+ WebSocketService webSocketService = new
WebSocketService(createClusterData(config), serviceConfiguration);
webSocketService.start();
final WebSocketServlet producerWebSocketServlet = new
WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 63d7e39..3377ec2 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -48,12 +48,15 @@ import static org.testng.Assert.assertTrue;
public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
+ static final String[] ARGS = new String[]{"-c",
"./src/test/resources/proxy.conf"};
+
+ private ProxyServiceStarter serviceStarter;
+
@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
- String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"};
- ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+ serviceStarter = new ProxyServiceStarter(ARGS);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setServicePort(Optional.of(11000));
@@ -65,6 +68,7 @@ public class ProxyServiceStarterTest extends
MockedPulsarServiceBaseTest {
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
+ serviceStarter.close();
}
@Test
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
similarity index 79%
copy from
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
copy to
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index 63d7e39..7e6c0f5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.proxy.server;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -43,46 +42,52 @@ import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
+import static org.apache.pulsar.proxy.server.ProxyServiceStarterTest.ARGS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
+public class ProxyServiceTlsStarterTest extends MockedPulsarServiceBaseTest {
+
+ private final String TLS_TRUST_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/cacert.pem";
+ private final String TLS_PROXY_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/server-cert.pem";
+ private final String TLS_PROXY_KEY_FILE_PATH =
"./src/test/resources/authentication/tls/server-key.pem";
+ private ProxyServiceStarter serviceStarter;
@Override
@BeforeClass
protected void setup() throws Exception {
internalSetup();
- String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"};
- ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+ serviceStarter = new ProxyServiceStarter(ARGS);
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+
serviceStarter.getConfig().setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
- serviceStarter.getConfig().setServicePort(Optional.of(11000));
+
serviceStarter.getConfig().setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ serviceStarter.getConfig().setServicePortTls(Optional.of(11043));
+ serviceStarter.getConfig().setTlsEnabledWithBroker(true);
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+
serviceStarter.getConfig().setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+ serviceStarter.getConfig().setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
serviceStarter.start();
}
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
+ this.conf.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
+ }
+
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
internalCleanup();
- }
-
- @Test
- public void testEnableWebSocketServer() throws Exception {
- HttpClient httpClient = new HttpClient();
- WebSocketClient webSocketClient = new WebSocketClient(httpClient);
- webSocketClient.start();
- MyWebSocket myWebSocket = new MyWebSocket();
- String webSocketUri = "ws://localhost:8080/ws/pingpong";
- Future<Session> sessionFuture = webSocketClient.connect(myWebSocket,
URI.create(webSocketUri));
-
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
- assertTrue(myWebSocket.getResponse().contains("ping"));
+ serviceStarter.close();
}
@Test
public void testProducer() throws Exception {
@Cleanup
- PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar+ssl://localhost:11043")
+
.allowTlsInsecureConnection(false).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
.build();
@Cleanup