This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 272725a [client] Issue 3218: Support specifying multiple hosts in
pulsar service url and web url (#3249)
272725a is described below
commit 272725a9ca4d0334df54aff4b751105949d2c253
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Jan 13 10:17:16 2019 +0900
[client] Issue 3218: Support specifying multiple hosts in pulsar service
url and web url (#3249)
*Motivation*
Sometimes people doesn't have DNS for brokers. It might be good for people
to specify a list of brokers as bootstrap brokers.
*Changes*
This PR introduce `ServiceURI` and `ServiceNameResolver` for supporting
specifying multiple hosts in pulsar service url and
web url.
Master Issue: #3218
---
.../pulsar/client/api/ServiceUrlProviderTest.java | 26 ++-
.../apache/pulsar/client/admin/PulsarAdmin.java | 6 +-
.../client/impl/BinaryProtoLookupService.java | 27 +--
.../org/apache/pulsar/client/impl/HttpClient.java | 23 +-
.../pulsar/client/impl/HttpLookupService.java | 2 +-
.../client/impl/PulsarServiceNameResolver.java | 104 +++++++++
.../pulsar/client/impl/ServiceNameResolver.java | 65 ++++++
.../client/impl/PulsarServiceNameResolverTest.java | 132 ++++++++++++
.../org/apache/pulsar/common/net/ServiceURI.java | 205 ++++++++++++++++++
.../apache/pulsar/common/net/ServiceURITest.java | 233 +++++++++++++++++++++
10 files changed, 785 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 73a51ad..40ae7f8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -30,6 +31,7 @@ import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
+@Slf4j
public class ServiceUrlProviderTest extends ProducerConsumerBase {
@BeforeClass
@@ -106,13 +108,27 @@ public class ServiceUrlProviderTest extends
ProducerConsumerBase {
conf.setWebServicePortTls(PortManager.nextFreePort());
startBroker();
PulsarService pulsarService2 = pulsar;
- System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() +
", Pulsar2=" + pulsarService2.getBrokerServiceUrl());
+
+ log.info("Pulsar1 = {}, Pulsar2 = {}",
pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(),
pulsarService2.getBrokerServiceUrl());
- Assert.assertEquals("pulsar://" +
producer.getClient().getLookup().getServiceUrl(),
pulsarService1.getBrokerServiceUrl());
- Assert.assertEquals("pulsar://" +
consumer.getClient().getLookup().getServiceUrl(),
pulsarService1.getBrokerServiceUrl());
+
+ log.info("Service url : producer = {}, consumer = {}",
+ producer.getClient().getLookup().getServiceUrl(),
+ consumer.getClient().getLookup().getServiceUrl());
+
+ Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(),
pulsarService1.getBrokerServiceUrl());
+ Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(),
pulsarService1.getBrokerServiceUrl());
+
+ log.info("Changing service url from {} to {}",
+ pulsarService1.getBrokerServiceUrl(),
+ pulsarService2.getBrokerServiceUrl());
+
serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
- Assert.assertEquals("pulsar://" +
producer.getClient().getLookup().getServiceUrl(),
pulsarService2.getBrokerServiceUrl());
- Assert.assertEquals("pulsar://" +
consumer.getClient().getLookup().getServiceUrl(),
pulsarService2.getBrokerServiceUrl());
+ log.info("Service url changed : producer = {}, consumer = {}",
+ producer.getClient().getLookup().getServiceUrl(),
+ consumer.getClient().getLookup().getServiceUrl());
+ Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(),
pulsarService2.getBrokerServiceUrl());
+ Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(),
pulsarService2.getBrokerServiceUrl());
producer.close();
consumer.close();
client.close();
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 026deca..c7d7fc5 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -133,8 +133,10 @@ public class PulsarAdmin implements Closeable {
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
httpConfig.register(MultiPartFeature.class);
- ClientBuilder clientBuilder =
ClientBuilder.newBuilder().withConfig(httpConfig)
-
.register(JacksonConfigurator.class).register(JacksonFeature.class);
+ ClientBuilder clientBuilder = ClientBuilder.newBuilder()
+ .withConfig(httpConfig)
+ .register(JacksonConfigurator.class)
+ .register(JacksonFeature.class);
boolean useTls = false;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e373ed8..94773e3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
public class BinaryProtoLookupService implements LookupService {
private final PulsarClientImpl client;
- protected volatile InetSocketAddress serviceAddress;
+ private final ServiceNameResolver serviceNameResolver;
private final boolean useTls;
private final ExecutorService executor;
@@ -60,22 +60,13 @@ public class BinaryProtoLookupService implements
LookupService {
this.client = client;
this.useTls = useTls;
this.executor = executor;
+ this.serviceNameResolver = new PulsarServiceNameResolver();
updateServiceUrl(serviceUrl);
}
@Override
public void updateServiceUrl(String serviceUrl) throws
PulsarClientException {
- URI uri;
- try {
- uri = new URI(serviceUrl);
-
- // Don't attempt to resolve the hostname in DNS at this point. It
will be done each time when attempting to
- // connect
- this.serviceAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
- } catch (Exception e) {
- log.error("Invalid service-url {} provided {}", serviceUrl,
e.getMessage(), e);
- throw new PulsarClientException.InvalidServiceURL(e);
- }
+ serviceNameResolver.updateServiceUrl(serviceUrl);
}
/**
@@ -86,7 +77,7 @@ public class BinaryProtoLookupService implements
LookupService {
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
getBroker(TopicName topicName) {
- return findBroker(serviceAddress, false, topicName);
+ return findBroker(serviceNameResolver.resolveHost(), false, topicName);
}
/**
@@ -94,7 +85,7 @@ public class BinaryProtoLookupService implements
LookupService {
*
*/
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName) {
- return getPartitionedTopicMetadata(serviceAddress, topicName);
+ return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(),
topicName);
}
private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>
findBroker(InetSocketAddress socketAddress,
@@ -133,7 +124,7 @@ public class BinaryProtoLookupService implements
LookupService {
// (3) received correct broker to connect
if (lookupDataResult.proxyThroughServiceUrl) {
// Connect through proxy
-
addressFuture.complete(Pair.of(responseBrokerAddress, serviceAddress));
+
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
} else {
// Normal result with direct connection to broker
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
@@ -192,7 +183,7 @@ public class BinaryProtoLookupService implements
LookupService {
@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName
topicName) {
- return
client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+ return
client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx
-> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetSchema(requestId,
topicName.toString(), Optional.empty());
@@ -201,7 +192,7 @@ public class BinaryProtoLookupService implements
LookupService {
}
public String getServiceUrl() {
- return serviceAddress.toString();
+ return serviceNameResolver.getServiceUrl();
}
@Override
@@ -212,7 +203,7 @@ public class BinaryProtoLookupService implements
LookupService {
Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
0 , TimeUnit.MILLISECONDS);
- getTopicsUnderNamespace(serviceAddress, namespace, backoff,
opTimeoutMs, topicsFuture, mode);
+ getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace,
backoff, opTimeoutMs, topicsFuture, mode);
return topicsFuture;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 0e94e6b..43e5bd7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -29,7 +29,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Properties;
@@ -60,7 +59,7 @@ public class HttpClient implements Closeable {
protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
protected final AsyncHttpClient httpClient;
- protected volatile URL url;
+ protected final ServiceNameResolver serviceNameResolver;
protected final Authentication authentication;
protected HttpClient(String serviceUrl, Authentication authentication,
@@ -74,7 +73,8 @@ public class HttpClient implements Closeable {
EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection,
String tlsTrustCertsFilePath,
int connectTimeoutInSeconds, int readTimeoutInSeconds) throws
PulsarClientException {
this.authentication = authentication;
- setServiceUrl(serviceUrl);
+ this.serviceNameResolver = new PulsarServiceNameResolver();
+ this.serviceNameResolver.updateServiceUrl(serviceUrl);
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
@@ -89,7 +89,7 @@ public class HttpClient implements Closeable {
}
});
- if ("https".equals(url.getProtocol())) {
+ if
("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
try {
SslContext sslCtx = null;
@@ -112,16 +112,15 @@ public class HttpClient implements Closeable {
AsyncHttpClientConfig config = confBuilder.build();
httpClient = new DefaultAsyncHttpClient(config);
- log.debug("Using HTTP url: {}", this.url);
+ log.debug("Using HTTP url: {}", serviceUrl);
+ }
+
+ String getServiceUrl() {
+ return this.serviceNameResolver.getServiceUrl();
}
void setServiceUrl(String serviceUrl) throws PulsarClientException {
- try {
- // Ensure trailing "/" on url
- url = new URL(serviceUrl);
- } catch (MalformedURLException e) {
- throw new PulsarClientException.InvalidServiceURL(e);
- }
+ this.serviceNameResolver.updateServiceUrl(serviceUrl);
}
@Override
@@ -132,7 +131,7 @@ public class HttpClient implements Closeable {
public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
- String requestUrl = new URL(url, path).toString();
+ String requestUrl = new
URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
AuthenticationDataProvider authData = authentication.getAuthData();
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 3d7cf0a..955c45e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -105,7 +105,7 @@ class HttpLookupService implements LookupService {
}
public String getServiceUrl() {
- return httpClient.url.toString();
+ return httpClient.getServiceUrl();
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
new file mode 100644
index 0000000..3d98489
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.util.internal.PlatformDependent;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * The default implementation of {@link ServiceNameResolver}.
+ */
+@Slf4j
+class PulsarServiceNameResolver implements ServiceNameResolver {
+
+ private volatile ServiceURI serviceUri;
+ private volatile String serviceUrl;
+ private volatile List<InetSocketAddress> addressList;
+
+ @Override
+ public InetSocketAddress resolveHost() {
+ List<InetSocketAddress> list = addressList;
+ checkState(
+ list != null, "No service url is provided yet");
+ checkState(
+ !list.isEmpty(), "No hosts found for service url : " + serviceUrl);
+ if (list.size() == 1) {
+ return list.get(0);
+ } else {
+ return list.get(randomIndex(list.size()));
+ }
+ }
+
+ @Override
+ public URI resolveHostUri() {
+ InetSocketAddress host = resolveHost();
+ String hostUrl = serviceUri.getServiceScheme() + "://" +
host.getHostName() + ":" + host.getPort();
+ return URI.create(hostUrl);
+ }
+
+ @Override
+ public String getServiceUrl() {
+ return serviceUrl;
+ }
+
+ @Override
+ public ServiceURI getServiceUri() {
+ return serviceUri;
+ }
+
+ @Override
+ public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL {
+ ServiceURI uri;
+ try {
+ uri = ServiceURI.create(serviceUrl);
+ } catch (IllegalArgumentException iae) {
+ log.error("Invalid service-url {} provided {}", serviceUrl,
iae.getMessage(), iae);
+ throw new InvalidServiceURL(iae);
+ }
+
+ String[] hosts = uri.getServiceHosts();
+ List<InetSocketAddress> addresses = new ArrayList<>(hosts.length);
+ for (String host : hosts) {
+ String hostUrl = uri.getServiceScheme() + "://" + host;
+ try {
+ URI hostUri = new URI(hostUrl);
+
addresses.add(InetSocketAddress.createUnresolved(hostUri.getHost(),
hostUri.getPort()));
+ } catch (URISyntaxException e) {
+ log.error("Invalid host provided {}", hostUrl, e.getMessage(),
e);
+ throw new InvalidServiceURL(e);
+ }
+ }
+ this.addressList = addresses;
+ this.serviceUrl = serviceUrl;
+ this.serviceUri = uri;
+ }
+
+ private static int randomIndex(int numAddresses) {
+ return numAddresses == 1 ? 0 :
PlatformDependent.threadLocalRandom().nextInt(numAddresses);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
new file mode 100644
index 0000000..4264b61
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * A service name resolver to resolve real socket address.
+ */
+public interface ServiceNameResolver {
+
+ /**
+ * Resolve pulsar service url.
+ *
+ * @return resolve the service url to return a socket address
+ */
+ InetSocketAddress resolveHost();
+
+ /**
+ * Resolve pulsar service url
+ * @return
+ */
+ URI resolveHostUri();
+
+ /**
+ * Get service url.
+ *
+ * @return service url
+ */
+ String getServiceUrl();
+
+ /**
+ * Get service uri.
+ *
+ * @return service uri
+ */
+ ServiceURI getServiceUri();
+
+ /**
+ * Update service url.
+ *
+ * @param serviceUrl service url
+ */
+ void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
+
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
new file mode 100644
index 0000000..878e240
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link PulsarServiceNameResolver}.
+ */
+public class PulsarServiceNameResolverTest {
+
+ private PulsarServiceNameResolver resolver;
+
+ @BeforeMethod
+ public void setup() {
+ this.resolver = new PulsarServiceNameResolver();
+ assertNull(resolver.getServiceUrl());
+ assertNull(resolver.getServiceUri());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testResolveBeforeUpdateServiceUrl() {
+ resolver.resolveHost();
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testResolveUrlBeforeUpdateServiceUrl() throws Exception {
+ resolver.resolveHostUri();
+ }
+
+ @Test
+ public void testUpdateInvalidServiceUrl() {
+ String serviceUrl = "pulsar:///";
+ try {
+ resolver.updateServiceUrl(serviceUrl);
+ fail("Should fail to update service url if service url is
invalid");
+ } catch (InvalidServiceURL isu) {
+ // expected
+ }
+ assertNull(resolver.getServiceUrl());
+ assertNull(resolver.getServiceUri());
+ }
+
+ @Test
+ public void testSimpleHostUrl() throws Exception {
+ String serviceUrl = "pulsar://host1:6650";
+ resolver.updateServiceUrl(serviceUrl);
+ assertEquals(serviceUrl, resolver.getServiceUrl());
+ assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+ InetSocketAddress expectedAddress =
InetSocketAddress.createUnresolved("host1", 6650);
+ assertEquals(expectedAddress, resolver.resolveHost());
+ assertEquals(URI.create(serviceUrl), resolver.resolveHostUri());
+
+ String newServiceUrl = "pulsar://host2:6650";
+ resolver.updateServiceUrl(newServiceUrl);
+ assertEquals(newServiceUrl, resolver.getServiceUrl());
+ assertEquals(ServiceURI.create(newServiceUrl),
resolver.getServiceUri());
+
+ InetSocketAddress newExpectedAddress =
InetSocketAddress.createUnresolved("host2", 6650);
+ assertEquals(newExpectedAddress, resolver.resolveHost());
+ assertEquals(URI.create(newServiceUrl), resolver.resolveHostUri());
+ }
+
+ @Test
+ public void testMultipleHostsUrl() throws Exception {
+ String serviceUrl = "pulsar://host1:6650,host2:6650";
+ resolver.updateServiceUrl(serviceUrl);
+ assertEquals(serviceUrl, resolver.getServiceUrl());
+ assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+ Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+ Set<URI> expectedHostUrls = new HashSet<>();
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host1",
6650));
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host2",
6650));
+ expectedHostUrls.add(URI.create("pulsar://host1:6650"));
+ expectedHostUrls.add(URI.create("pulsar://host2:6650"));
+
+ for (int i = 0; i < 10; i++) {
+ assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+ assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+ }
+ }
+
+ @Test
+ public void testMultipleHostsTlsUrl() throws Exception {
+ String serviceUrl = "pulsar+ssl://host1:6651,host2:6651";
+ resolver.updateServiceUrl(serviceUrl);
+ assertEquals(serviceUrl, resolver.getServiceUrl());
+ assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+ Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+ Set<URI> expectedHostUrls = new HashSet<>();
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host1",
6651));
+ expectedAddresses.add(InetSocketAddress.createUnresolved("host2",
6651));
+ expectedHostUrls.add(URI.create("pulsar+ssl://host1:6651"));
+ expectedHostUrls.add(URI.create("pulsar+ssl://host2:6651"));
+
+ for (int i = 0; i < 10; i++) {
+ assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+ assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+ }
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
new file mode 100644
index 0000000..f562b7d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * ServiceURI represents service uri within pulsar cluster.
+ *
+ * <p>This file is based on
+ * {@link
https://github.com/apache/bookkeeper/blob/master/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java}
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@EqualsAndHashCode
+public class ServiceURI {
+
+ private static final String BINARY_SERVICE = "pulsar";
+ private static final String HTTP_SERVICE = "http";
+ private static final String HTTPS_SERVICE = "https";
+ private static final String SSL_SERVICE = "ssl";
+
+ private static final int BINARY_PORT = 6650;
+ private static final int BINARY_TLS_PORT = 6651;
+ private static final int HTTP_PORT = 8080;
+ private static final int HTTPS_PORT = 8443;
+
+ /**
+ * Create a service uri instance from a uri string.
+ *
+ * @param uriStr service uri string
+ * @return a service uri instance
+ * @throws NullPointerException if {@code uriStr} is null
+ * @throws IllegalArgumentException if the given string violates
RFC 2396
+ */
+ public static ServiceURI create(String uriStr) {
+ checkNotNull(uriStr, "service uri string is null");
+
+ // a service uri first should be a valid java.net.URI
+ URI uri = URI.create(uriStr);
+
+ return create(uri);
+ }
+
+ /**
+ * Create a service uri instance from a {@link URI} instance.
+ *
+ * @param uri {@link URI} instance
+ * @return a service uri instance
+ * @throws NullPointerException if {@code uriStr} is null
+ * @throws IllegalArgumentException if the given string violates
RFC 2396
+ */
+ public static ServiceURI create(URI uri) {
+ checkNotNull(uri, "service uri instance is null");
+
+ String serviceName;
+ final String[] serviceInfos;
+ String scheme = uri.getScheme();
+ if (null != scheme) {
+ scheme = scheme.toLowerCase();
+ final String serviceSep = "+";
+ String[] schemeParts = StringUtils.split(scheme, serviceSep);
+ serviceName = schemeParts[0];
+ serviceInfos = new String[schemeParts.length - 1];
+ System.arraycopy(schemeParts, 1, serviceInfos, 0,
serviceInfos.length);
+ } else {
+ serviceName = null;
+ serviceInfos = new String[0];
+ }
+
+ String userAndHostInformation = uri.getAuthority();
+ checkArgument(!Strings.isNullOrEmpty(userAndHostInformation),
+ "authority component is missing in service uri : " + uri);
+
+ String serviceUser;
+ List<String> serviceHosts;
+ int atIndex = userAndHostInformation.indexOf('@');
+ Splitter splitter = Splitter.on(CharMatcher.anyOf(",;"));
+ if (atIndex > 0) {
+ serviceUser = userAndHostInformation.substring(0, atIndex);
+ serviceHosts =
splitter.splitToList(userAndHostInformation.substring(atIndex + 1));
+ } else {
+ serviceUser = null;
+ serviceHosts = splitter.splitToList(userAndHostInformation);
+ }
+ serviceHosts = serviceHosts
+ .stream()
+ .map(host -> validateHostName(serviceName, serviceInfos, host))
+ .collect(Collectors.toList());
+
+ String servicePath = uri.getPath();
+ checkArgument(null != servicePath,
+ "service path component is missing in service uri : " + uri);
+
+ return new ServiceURI(
+ serviceName,
+ serviceInfos,
+ serviceUser,
+ serviceHosts.toArray(new String[serviceHosts.size()]),
+ servicePath,
+ uri);
+ }
+
+ private static String validateHostName(String serviceName,
+ String[] serviceInfos,
+ String hostname) {
+ String[] parts = hostname.split(":");
+ if (parts.length >= 3) {
+ throw new IllegalArgumentException("Invalid hostname : " +
hostname);
+ } else if (parts.length == 2) {
+ try {
+ Integer.parseUnsignedInt(parts[1]);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("Invalid hostname : " +
hostname);
+ }
+ return hostname;
+ } else if (parts.length == 1) {
+ return hostname + ":" + getServicePort(serviceName, serviceInfos);
+ } else {
+ return hostname;
+ }
+ }
+
+ private final String serviceName;
+ private final String[] serviceInfos;
+ private final String serviceUser;
+ private final String[] serviceHosts;
+ private final String servicePath;
+ private final URI uri;
+
+ public String[] getServiceInfos() {
+ return serviceInfos;
+ }
+
+ public String[] getServiceHosts() {
+ return serviceHosts;
+ }
+
+ public String getServiceScheme() {
+ if (null == serviceName) {
+ return null;
+ } else {
+ if (serviceInfos.length == 0) {
+ return serviceName;
+ } else {
+ return serviceName + "+" + StringUtils.join(serviceInfos, '+');
+ }
+ }
+ }
+
+ private static int getServicePort(String serviceName, String[]
serviceInfos) {
+ int port;
+ switch (serviceName.toLowerCase()) {
+ case BINARY_SERVICE:
+ if (serviceInfos.length == 0) {
+ port = BINARY_PORT;
+ } else if (serviceInfos.length == 1 &&
serviceInfos[0].toLowerCase().equals(SSL_SERVICE)) {
+ port = BINARY_TLS_PORT;
+ } else {
+ throw new IllegalArgumentException("Invalid pulsar service
: " + serviceName + "+" + serviceInfos);
+ }
+ break;
+ case HTTP_SERVICE:
+ port = HTTP_PORT;
+ break;
+ case HTTPS_SERVICE:
+ port = HTTPS_PORT;
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid pulsar service : "
+ serviceName);
+ }
+ return port;
+ }
+
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
new file mode 100644
index 0000000..99921c6
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import java.net.URI;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link ServiceURI}.
+ */
+public class ServiceURITest {
+
+ private static void assertServiceUri(
+ String serviceUri,
+ String expectedServiceName,
+ String[] expectedServiceInfo,
+ String expectedServiceUser,
+ String[] expectedServiceHosts,
+ String expectedServicePath) {
+
+ ServiceURI serviceURI = ServiceURI.create(serviceUri);
+
+ assertEquals(expectedServiceName, serviceURI.getServiceName());
+ assertArrayEquals(expectedServiceInfo, serviceURI.getServiceInfos());
+ assertEquals(expectedServiceUser, serviceURI.getServiceUser());
+ assertArrayEquals(expectedServiceHosts, serviceURI.getServiceHosts());
+ assertEquals(expectedServicePath, serviceURI.getServicePath());
+ }
+
+ @Test
+ public void testInvalidServiceUris() {
+ String[] uris = new String[] {
+ "://localhost:6650", // missing scheme
+ "pulsar:///", // missing authority
+ "pulsar://localhost:6650:6651/", // invalid hostname pair
+ "pulsar://localhost:xyz/", // invalid port
+ "pulsar://localhost:-6650/", // negative port
+ };
+
+ for (String uri : uris) {
+ testInvalidServiceUri(uri);
+ }
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullServiceUriString() {
+ ServiceURI.create((String) null);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testNullServiceUriInstance() {
+ ServiceURI.create((URI) null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testEmptyServiceUriString() {
+ ServiceURI.create("");
+ }
+
+ private void testInvalidServiceUri(String serviceUri) {
+ try {
+ ServiceURI.create(serviceUri);
+ fail("Should fail to parse service uri : " + serviceUri);
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMissingServiceName() {
+ String serviceUri = "//localhost:6650/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ null, new String[0], null, new String[] { "localhost:6650" },
"/path/to/namespace");
+ }
+
+ @Test
+ public void testEmptyPath() {
+ String serviceUri = "pulsar://localhost:6650";
+ assertServiceUri(
+ serviceUri,
+ "pulsar", new String[0], null, new String[] { "localhost:6650" },
"");
+ }
+
+ @Test
+ public void testRootPath() {
+ String serviceUri = "pulsar://localhost:6650/";
+ assertServiceUri(
+ serviceUri,
+ "pulsar", new String[0], null, new String[] { "localhost:6650" },
"/");
+ }
+
+ @Test
+ public void testUserInfo() {
+ String serviceUri =
"pulsar://pulsaruser@localhost:6650/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ "pulsaruser",
+ new String[] { "localhost:6650" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsSemiColon() {
+ String serviceUri =
"pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ null,
+ new String[] { "host1:6650", "host2:6650", "host3:6650" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsComma() {
+ String serviceUri =
"pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ null,
+ new String[] { "host1:6650", "host2:6650", "host3:6650" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsWithoutPulsarPorts() {
+ String serviceUri = "pulsar://host1,host2,host3/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ null,
+ new String[] { "host1:6650", "host2:6650", "host3:6650" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsWithoutPulsarTlsPorts() {
+ String serviceUri = "pulsar+ssl://host1,host2,host3/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[] { "ssl" },
+ null,
+ new String[] { "host1:6651", "host2:6651", "host3:6651" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsWithoutHttpPorts() {
+ String serviceUri = "http://host1,host2,host3/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "http",
+ new String[0],
+ null,
+ new String[] { "host1:8080", "host2:8080", "host3:8080" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsWithoutHttpsPorts() {
+ String serviceUri = "https://host1,host2,host3/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "https",
+ new String[0],
+ null,
+ new String[] { "host1:8443", "host2:8443", "host3:8443" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsMixedPorts() {
+ String serviceUri =
"pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ null,
+ new String[] { "host1:6640", "host2:6650", "host3:6660" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testMultipleHostsMixed() {
+ String serviceUri =
"pulsar://host1:6640,host2,host3:6660/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ null,
+ new String[] { "host1:6640", "host2:6650", "host3:6660" },
+ "/path/to/namespace");
+ }
+
+ @Test
+ public void testUserInfoWithMultipleHosts() {
+ String serviceUri =
"pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace";
+ assertServiceUri(
+ serviceUri,
+ "pulsar",
+ new String[0],
+ "pulsaruser",
+ new String[] { "host1:6650", "host2:6650", "host3:6650" },
+ "/path/to/namespace");
+ }
+
+}