This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1952f94769d [improve] [proxy] Add a check for brokerServiceURL that
does not support multi uri yet (#21972)
1952f94769d is described below
commit 1952f94769d9dc80908d159be6e6ce1ff48b83fb
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 30 19:34:01 2024 +0800
[improve] [proxy] Add a check for brokerServiceURL that does not support
multi uri yet (#21972)
### Motivation
At the beginning of the design, these two configurations(`brokerServiceURL
& brokerServiceURLTLS`) do not support setting multiple broker addresses, which
should instead be set to a “discovery service provider.” see:
https://github.com/apache/pulsar/pull/1002 and
https://github.com/apache/pulsar/pull/14682
Users will get the below error if they set A to a multi-broker URLs
```
"2024-01-09 00:20:10,261 -0800 [pulsar-proxy-io-4-7] WARN
io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired,
and it reached at the tail of the pipeline. It usually means the last handler
in the pipeline did not handle the exception.
java.lang.IllegalArgumentException: port out of range:-1
at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
~[?:?]
at
java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254) ~[?:?]
at
org.apache.pulsar.proxy.server.LookupProxyHandler.getAddr(LookupProxyHandler.java:432)
~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at
org.apache.pulsar.proxy.server.LookupProxyHandler.handleGetSchema(LookupProxyHandler.java:357)
~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at
org.apache.pulsar.proxy.server.ProxyConnection.handleGetSchema(ProxyConnection.java:463)
~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:326)
~[io.streamnative-pulsar-common-2.9.2.12.jar:2.9.2.12]
at
org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:221)
~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246)
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286)
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
~[io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
```
### Modifications
- Improve the description
- Add a check to prevent wrong settings
---
conf/proxy.conf | 10 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 20 ++--
.../pulsar/proxy/server/ProxyServiceStarter.java | 24 ++++-
.../proxy/server/ProxyConfigurationTest.java | 119 +++++++++++++++++++++
.../proxy/server/ProxyServiceStarterTest.java | 2 +-
5 files changed, 163 insertions(+), 12 deletions(-)
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 4194bf76219..8285e1cb753 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -28,17 +28,19 @@ metadataStoreUrl=
# The metadata store URL for the configuration data. If empty, we fall back to
use metadataStoreUrl
configurationMetadataStoreUrl=
-# If Service Discovery is Disabled this url should point to the discovery
service provider.
+# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url
should point to the discovery service
+# provider, and does not support multi urls yet.
# The URL must begin with pulsar:// for plaintext or with pulsar+ssl:// for
TLS.
brokerServiceURL=
brokerServiceURLTLS=
-# These settings are unnecessary if `zookeeperServers` is specified
+# If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url
should point to the discovery service
+# provider, and does not support multi urls yet.
brokerWebServiceURL=
brokerWebServiceURLTLS=
-# If function workers are setup in a separate cluster, configure the following
2 settings
-# to point to the function workers cluster
+# If function workers are setup in a separate cluster, configure the following
2 settings. This url should point to
+# the discovery service provider of the function workers cluster, and does not
support multi urls yet.
functionWorkerWebServiceURL=
functionWorkerWebServiceURLTLS=
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 7178a0ceda4..db2969e3c39 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -173,35 +173,43 @@ public class ProxyConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The service url points to the broker cluster. URL must have the
pulsar:// prefix."
+ doc = "If does not set metadataStoreUrl or
configurationMetadataStoreUrl, this url should point to the"
+ + " discovery service provider."
+ + " URL must have the pulsar:// prefix. And does not support
multi url yet."
)
private String brokerServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The tls service url points to the broker cluster. URL must have
the pulsar+ssl:// prefix."
+ doc = "If does not set metadataStoreUrl or
configurationMetadataStoreUrl, this url should point to the"
+ + " discovery service provider."
+ + " URL must have the pulsar+ssl:// prefix. And does not
support multi url yet."
)
private String brokerServiceURLTLS;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The web service url points to the broker cluster"
+ doc = "The web service url points to the discovery service provider of
the broker cluster, and does not support"
+ + " multi url yet."
)
private String brokerWebServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The tls web service url points to the broker cluster"
+ doc = "The tls web service url points to the discovery service
provider of the broker cluster, and does not"
+ + " support multi url yet."
)
private String brokerWebServiceURLTLS;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The web service url points to the function worker cluster."
+ doc = "The web service url points to the discovery service provider of
the function worker cluster, and does"
+ + " not support multi url yet."
+ " Only configure it when you setup function workers in a
separate cluster"
)
private String functionWorkerWebServiceURL;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
- doc = "The tls web service url points to the function worker cluster."
+ doc = "The tls web service url points to the discovery service
provider of the function worker cluster, and"
+ + " does not support multi url yet."
+ " Only configure it when you setup function workers in a
separate cluster"
)
private String functionWorkerWebServiceURLTLS;
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index aa80b03613b..1a98601f2a9 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -162,11 +162,28 @@ public class ProxyServiceStarter {
if (isNotBlank(config.getBrokerServiceURL())) {
checkArgument(config.getBrokerServiceURL().startsWith("pulsar://"),
"brokerServiceURL must start with pulsar://");
+ ensureUrlNotContainsComma("brokerServiceURL",
config.getBrokerServiceURL());
}
-
if (isNotBlank(config.getBrokerServiceURLTLS())) {
checkArgument(config.getBrokerServiceURLTLS().startsWith("pulsar+ssl://"),
"brokerServiceURLTLS must start with pulsar+ssl://");
+ ensureUrlNotContainsComma("brokerServiceURLTLS",
config.getBrokerServiceURLTLS());
+ }
+
+ if (isNotBlank(config.getBrokerWebServiceURL())) {
+ ensureUrlNotContainsComma("brokerWebServiceURL",
config.getBrokerWebServiceURL());
+ }
+ if (isNotBlank(config.getBrokerWebServiceURLTLS())) {
+ ensureUrlNotContainsComma("brokerWebServiceURLTLS",
config.getBrokerWebServiceURLTLS());
+ }
+
+ if (isNotBlank(config.getFunctionWorkerWebServiceURL())) {
+ ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS",
+ config.getFunctionWorkerWebServiceURL());
+ }
+ if (isNotBlank(config.getFunctionWorkerWebServiceURLTLS())) {
+ ensureUrlNotContainsComma("functionWorkerWebServiceURLTLS",
+ config.getFunctionWorkerWebServiceURLTLS());
}
if ((isBlank(config.getBrokerServiceURL()) &&
isBlank(config.getBrokerServiceURLTLS()))
@@ -187,6 +204,11 @@ public class ProxyServiceStarter {
}
}
+ private void ensureUrlNotContainsComma(String paramName, String
paramValue) {
+ checkArgument(!paramValue.contains(","), paramName + " does not
support multi urls yet,"
+ + " it should point to the discovery service provider.");
+ }
+
public static void main(String[] args) throws Exception {
ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
try {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java
index 97a73c20b60..a9a562e04c8 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConfigurationTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.proxy.server;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.beans.Introspector;
@@ -36,6 +38,8 @@ import java.util.List;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Test(groups = "broker")
public class ProxyConfigurationTest {
@@ -134,4 +138,119 @@ public class ProxyConfigurationTest {
}
}
+ @Test
+ public void testBrokerUrlCheck() throws IOException {
+ ProxyConfiguration configuration = new ProxyConfiguration();
+ // brokerServiceURL must start with pulsar://
+ configuration.setBrokerServiceURL("127.0.0.1:6650");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerServiceURL must start with pulsar://");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("brokerServiceURL must
start with pulsar://"));
+ }
+ }
+ configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650");
+
+ // brokerServiceURLTLS must start with pulsar+ssl://
+ configuration.setBrokerServiceURLTLS("pulsar://127.0.0.1:6650");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerServiceURLTLS must start with pulsar+ssl://");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("brokerServiceURLTLS must
start with pulsar+ssl://"));
+ }
+ }
+
+ // brokerServiceURL did not support multi urls yet.
+
configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650,pulsar://127.0.0.2:6650");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerServiceURL does not support multi urls yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+ configuration.setBrokerServiceURL("pulsar://127.0.0.1:6650");
+
+ // brokerServiceURLTLS did not support multi urls yet.
+
configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650,pulsar+ssl:127.0.0.2:6650");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerServiceURLTLS does not support multi urls yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+ configuration.setBrokerServiceURLTLS("pulsar+ssl://127.0.0.1:6650");
+
+ // brokerWebServiceURL did not support multi urls yet.
+
configuration.setBrokerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerWebServiceURL does not support multi urls yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+ configuration.setBrokerWebServiceURL("http://127.0.0.1:8080");
+
+ // brokerWebServiceURLTLS did not support multi urls yet.
+
configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443,https://127.0.0.2:443");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("brokerWebServiceURLTLS does not support multi urls yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+ configuration.setBrokerWebServiceURLTLS("https://127.0.0.1:443");
+
+ // functionWorkerWebServiceURL did not support multi urls yet.
+
configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080,http://127.0.0.2:8080");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("functionWorkerWebServiceURL does not support multi urls
yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+ configuration.setFunctionWorkerWebServiceURL("http://127.0.0.1:8080");
+
+ // functionWorkerWebServiceURLTLS did not support multi urls yet.
+
configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443,http://127.0.0.2:443");
+ try (MockedStatic<PulsarConfigurationLoader> theMock =
Mockito.mockStatic(PulsarConfigurationLoader.class)) {
+ theMock.when(PulsarConfigurationLoader.create(Mockito.anyString(),
Mockito.any()))
+ .thenReturn(configuration);
+ try {
+ new ProxyServiceStarter(ProxyServiceStarterTest.ARGS);
+ fail("functionWorkerWebServiceURLTLS does not support multi
urls yet");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("does not support multi
urls yet"));
+ }
+ }
+
configuration.setFunctionWorkerWebServiceURLTLS("http://127.0.0.1:443");
+ }
+
}
\ No newline at end of file
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 925e8192e14..71b1087ee64 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -45,7 +45,7 @@ import org.testng.annotations.Test;
public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
- static final String[] ARGS = new String[]{"-c",
"./src/test/resources/proxy.conf"};
+ public static final String[] ARGS = new String[]{"-c",
"./src/test/resources/proxy.conf"};
protected ProxyServiceStarter serviceStarter;
protected String serviceUrl;