This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 272725a  [client] Issue 3218: Support specifying multiple hosts in 
pulsar service url and web url (#3249)
272725a is described below

commit 272725a9ca4d0334df54aff4b751105949d2c253
Author: Sijie Guo <[email protected]>
AuthorDate: Sun Jan 13 10:17:16 2019 +0900

    [client] Issue 3218: Support specifying multiple hosts in pulsar service 
url and web url (#3249)
    
    *Motivation*
    
    Sometimes people doesn't have DNS for brokers. It might be good for people 
to specify a list of brokers as bootstrap brokers.
    
    *Changes*
    
    This PR introduce `ServiceURI` and `ServiceNameResolver` for supporting 
specifying multiple hosts in pulsar service url and
    web url.
    
    Master Issue: #3218
---
 .../pulsar/client/api/ServiceUrlProviderTest.java  |  26 ++-
 .../apache/pulsar/client/admin/PulsarAdmin.java    |   6 +-
 .../client/impl/BinaryProtoLookupService.java      |  27 +--
 .../org/apache/pulsar/client/impl/HttpClient.java  |  23 +-
 .../pulsar/client/impl/HttpLookupService.java      |   2 +-
 .../client/impl/PulsarServiceNameResolver.java     | 104 +++++++++
 .../pulsar/client/impl/ServiceNameResolver.java    |  65 ++++++
 .../client/impl/PulsarServiceNameResolverTest.java | 132 ++++++++++++
 .../org/apache/pulsar/common/net/ServiceURI.java   | 205 ++++++++++++++++++
 .../apache/pulsar/common/net/ServiceURITest.java   | 233 +++++++++++++++++++++
 10 files changed, 785 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 73a51ad..40ae7f8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -30,6 +31,7 @@ import org.testng.annotations.Test;
 
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 public class ServiceUrlProviderTest extends ProducerConsumerBase {
 
     @BeforeClass
@@ -106,13 +108,27 @@ public class ServiceUrlProviderTest extends 
ProducerConsumerBase {
         conf.setWebServicePortTls(PortManager.nextFreePort());
         startBroker();
         PulsarService pulsarService2 = pulsar;
-        System.out.println("Pulsar1=" + pulsarService1.getBrokerServiceUrl() + 
", Pulsar2=" + pulsarService2.getBrokerServiceUrl());
+
+        log.info("Pulsar1 = {}, Pulsar2 = {}", 
pulsarService1.getBrokerServiceUrl(), pulsarService2.getBrokerServiceUrl());
         Assert.assertNotEquals(pulsarService1.getBrokerServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + 
producer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + 
consumer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
+
+        log.info("Service url : producer = {}, consumer = {}",
+            producer.getClient().getLookup().getServiceUrl(),
+            consumer.getClient().getLookup().getServiceUrl());
+
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), 
pulsarService1.getBrokerServiceUrl());
+
+        log.info("Changing service url from {} to {}",
+            pulsarService1.getBrokerServiceUrl(),
+            pulsarService2.getBrokerServiceUrl());
+
         
serviceUrlProvider.onServiceUrlChanged(pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + 
producer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
-        Assert.assertEquals("pulsar://" + 
consumer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
+        log.info("Service url changed : producer = {}, consumer = {}",
+            producer.getClient().getLookup().getServiceUrl(),
+            consumer.getClient().getLookup().getServiceUrl());
+        Assert.assertEquals(producer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
+        Assert.assertEquals(consumer.getClient().getLookup().getServiceUrl(), 
pulsarService2.getBrokerServiceUrl());
         producer.close();
         consumer.close();
         client.close();
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 026deca..c7d7fc5 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -133,8 +133,10 @@ public class PulsarAdmin implements Closeable {
         httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
         httpConfig.register(MultiPartFeature.class);
 
-        ClientBuilder clientBuilder = 
ClientBuilder.newBuilder().withConfig(httpConfig)
-                
.register(JacksonConfigurator.class).register(JacksonFeature.class);
+        ClientBuilder clientBuilder = ClientBuilder.newBuilder()
+            .withConfig(httpConfig)
+            .register(JacksonConfigurator.class)
+            .register(JacksonFeature.class);
 
         boolean useTls = false;
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index e373ed8..94773e3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
 public class BinaryProtoLookupService implements LookupService {
 
     private final PulsarClientImpl client;
-    protected volatile InetSocketAddress serviceAddress;
+    private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
     private final ExecutorService executor;
 
@@ -60,22 +60,13 @@ public class BinaryProtoLookupService implements 
LookupService {
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        this.serviceNameResolver = new PulsarServiceNameResolver();
         updateServiceUrl(serviceUrl);
     }
 
     @Override
     public void updateServiceUrl(String serviceUrl) throws 
PulsarClientException {
-        URI uri;
-        try {
-            uri = new URI(serviceUrl);
-
-            // Don't attempt to resolve the hostname in DNS at this point. It 
will be done each time when attempting to
-            // connect
-            this.serviceAddress = 
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
-        } catch (Exception e) {
-            log.error("Invalid service-url {} provided {}", serviceUrl, 
e.getMessage(), e);
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        serviceNameResolver.updateServiceUrl(serviceUrl);
     }
 
     /**
@@ -86,7 +77,7 @@ public class BinaryProtoLookupService implements 
LookupService {
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
getBroker(TopicName topicName) {
-        return findBroker(serviceAddress, false, topicName);
+        return findBroker(serviceNameResolver.resolveHost(), false, topicName);
     }
 
     /**
@@ -94,7 +85,7 @@ public class BinaryProtoLookupService implements 
LookupService {
      *
      */
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
-        return getPartitionedTopicMetadata(serviceAddress, topicName);
+        return getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), 
topicName);
     }
 
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> 
findBroker(InetSocketAddress socketAddress,
@@ -133,7 +124,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                         // (3) received correct broker to connect
                         if (lookupDataResult.proxyThroughServiceUrl) {
                             // Connect through proxy
-                            
addressFuture.complete(Pair.of(responseBrokerAddress, serviceAddress));
+                            
addressFuture.complete(Pair.of(responseBrokerAddress, socketAddress));
                         } else {
                             // Normal result with direct connection to broker
                             
addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
@@ -192,7 +183,7 @@ public class BinaryProtoLookupService implements 
LookupService {
 
     @Override
     public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName 
topicName) {
-        return 
client.getCnxPool().getConnection(serviceAddress).thenCompose(clientCnx -> {
+        return 
client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx
 -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newGetSchema(requestId, 
topicName.toString(), Optional.empty());
 
@@ -201,7 +192,7 @@ public class BinaryProtoLookupService implements 
LookupService {
     }
 
     public String getServiceUrl() {
-        return serviceAddress.toString();
+        return serviceNameResolver.getServiceUrl();
     }
 
     @Override
@@ -212,7 +203,7 @@ public class BinaryProtoLookupService implements 
LookupService {
         Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
             opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
             0 , TimeUnit.MILLISECONDS);
-        getTopicsUnderNamespace(serviceAddress, namespace, backoff, 
opTimeoutMs, topicsFuture, mode);
+        getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, 
backoff, opTimeoutMs, topicsFuture, mode);
         return topicsFuture;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 0e94e6b..43e5bd7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -29,7 +29,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map;
 import java.util.Properties;
@@ -60,7 +59,7 @@ public class HttpClient implements Closeable {
     protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     protected final AsyncHttpClient httpClient;
-    protected volatile URL url;
+    protected final ServiceNameResolver serviceNameResolver;
     protected final Authentication authentication;
 
     protected HttpClient(String serviceUrl, Authentication authentication,
@@ -74,7 +73,8 @@ public class HttpClient implements Closeable {
             EventLoopGroup eventLoopGroup, boolean tlsAllowInsecureConnection, 
String tlsTrustCertsFilePath,
             int connectTimeoutInSeconds, int readTimeoutInSeconds) throws 
PulsarClientException {
         this.authentication = authentication;
-        setServiceUrl(serviceUrl);
+        this.serviceNameResolver = new PulsarServiceNameResolver();
+        this.serviceNameResolver.updateServiceUrl(serviceUrl);
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
@@ -89,7 +89,7 @@ public class HttpClient implements Closeable {
             }
         });
 
-        if ("https".equals(url.getProtocol())) {
+        if 
("https".equals(serviceNameResolver.getServiceUri().getServiceName())) {
             try {
                 SslContext sslCtx = null;
 
@@ -112,16 +112,15 @@ public class HttpClient implements Closeable {
         AsyncHttpClientConfig config = confBuilder.build();
         httpClient = new DefaultAsyncHttpClient(config);
 
-        log.debug("Using HTTP url: {}", this.url);
+        log.debug("Using HTTP url: {}", serviceUrl);
+    }
+
+    String getServiceUrl() {
+        return this.serviceNameResolver.getServiceUrl();
     }
 
     void setServiceUrl(String serviceUrl) throws PulsarClientException {
-        try {
-            // Ensure trailing "/" on url
-            url = new URL(serviceUrl);
-        } catch (MalformedURLException e) {
-            throw new PulsarClientException.InvalidServiceURL(e);
-        }
+        this.serviceNameResolver.updateServiceUrl(serviceUrl);
     }
 
     @Override
@@ -132,7 +131,7 @@ public class HttpClient implements Closeable {
     public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
         final CompletableFuture<T> future = new CompletableFuture<>();
         try {
-            String requestUrl = new URL(url, path).toString();
+            String requestUrl = new 
URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
             AuthenticationDataProvider authData = authentication.getAuthData();
             BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 3d7cf0a..955c45e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -105,7 +105,7 @@ class HttpLookupService implements LookupService {
     }
 
     public String getServiceUrl() {
-       return httpClient.url.toString();
+       return httpClient.getServiceUrl();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
new file mode 100644
index 0000000..3d98489
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.util.internal.PlatformDependent;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * The default implementation of {@link ServiceNameResolver}.
+ */
+@Slf4j
+class PulsarServiceNameResolver implements ServiceNameResolver {
+
+    private volatile ServiceURI serviceUri;
+    private volatile String serviceUrl;
+    private volatile List<InetSocketAddress> addressList;
+
+    @Override
+    public InetSocketAddress resolveHost() {
+        List<InetSocketAddress> list = addressList;
+        checkState(
+            list != null, "No service url is provided yet");
+        checkState(
+            !list.isEmpty(), "No hosts found for service url : " + serviceUrl);
+        if (list.size() == 1) {
+            return list.get(0);
+        } else {
+            return list.get(randomIndex(list.size()));
+        }
+    }
+
+    @Override
+    public URI resolveHostUri() {
+        InetSocketAddress host = resolveHost();
+        String hostUrl = serviceUri.getServiceScheme() + "://" + 
host.getHostName() + ":" + host.getPort();
+        return URI.create(hostUrl);
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    @Override
+    public ServiceURI getServiceUri() {
+        return serviceUri;
+    }
+
+    @Override
+    public void updateServiceUrl(String serviceUrl) throws InvalidServiceURL {
+        ServiceURI uri;
+        try {
+            uri = ServiceURI.create(serviceUrl);
+        } catch (IllegalArgumentException iae) {
+            log.error("Invalid service-url {} provided {}", serviceUrl, 
iae.getMessage(), iae);
+            throw new InvalidServiceURL(iae);
+        }
+
+        String[] hosts = uri.getServiceHosts();
+        List<InetSocketAddress> addresses = new ArrayList<>(hosts.length);
+        for (String host : hosts) {
+            String hostUrl = uri.getServiceScheme() + "://" + host;
+            try {
+                URI hostUri = new URI(hostUrl);
+                
addresses.add(InetSocketAddress.createUnresolved(hostUri.getHost(), 
hostUri.getPort()));
+            } catch (URISyntaxException e) {
+                log.error("Invalid host provided {}", hostUrl, e.getMessage(), 
e);
+                throw new InvalidServiceURL(e);
+            }
+        }
+        this.addressList = addresses;
+        this.serviceUrl = serviceUrl;
+        this.serviceUri = uri;
+    }
+
+    private static int randomIndex(int numAddresses) {
+        return numAddresses == 1 ? 0 : 
PlatformDependent.threadLocalRandom().nextInt(numAddresses);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
new file mode 100644
index 0000000..4264b61
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+
+/**
+ * A service name resolver to resolve real socket address.
+ */
+public interface ServiceNameResolver {
+
+    /**
+     * Resolve pulsar service url.
+     *
+     * @return resolve the service url to return a socket address
+     */
+    InetSocketAddress resolveHost();
+
+    /**
+     * Resolve pulsar service url
+     * @return
+     */
+    URI resolveHostUri();
+
+    /**
+     * Get service url.
+     *
+     * @return service url
+     */
+    String getServiceUrl();
+
+    /**
+     * Get service uri.
+     *
+     * @return service uri
+     */
+    ServiceURI getServiceUri();
+
+    /**
+     * Update service url.
+     *
+     * @param serviceUrl service url
+     */
+    void updateServiceUrl(String serviceUrl) throws InvalidServiceURL;
+
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
new file mode 100644
index 0000000..878e240
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link PulsarServiceNameResolver}.
+ */
+public class PulsarServiceNameResolverTest {
+
+    private PulsarServiceNameResolver resolver;
+
+    @BeforeMethod
+    public void setup() {
+        this.resolver = new PulsarServiceNameResolver();
+        assertNull(resolver.getServiceUrl());
+        assertNull(resolver.getServiceUri());
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testResolveBeforeUpdateServiceUrl() {
+        resolver.resolveHost();
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void testResolveUrlBeforeUpdateServiceUrl() throws Exception {
+        resolver.resolveHostUri();
+    }
+
+    @Test
+    public void testUpdateInvalidServiceUrl() {
+        String serviceUrl = "pulsar:///";
+        try {
+            resolver.updateServiceUrl(serviceUrl);
+            fail("Should fail to update service url if service url is 
invalid");
+        } catch (InvalidServiceURL isu) {
+            // expected
+        }
+        assertNull(resolver.getServiceUrl());
+        assertNull(resolver.getServiceUri());
+    }
+
+    @Test
+    public void testSimpleHostUrl() throws Exception {
+        String serviceUrl = "pulsar://host1:6650";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        InetSocketAddress expectedAddress = 
InetSocketAddress.createUnresolved("host1", 6650);
+        assertEquals(expectedAddress, resolver.resolveHost());
+        assertEquals(URI.create(serviceUrl), resolver.resolveHostUri());
+
+        String newServiceUrl = "pulsar://host2:6650";
+        resolver.updateServiceUrl(newServiceUrl);
+        assertEquals(newServiceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(newServiceUrl), 
resolver.getServiceUri());
+
+        InetSocketAddress newExpectedAddress = 
InetSocketAddress.createUnresolved("host2", 6650);
+        assertEquals(newExpectedAddress, resolver.resolveHost());
+        assertEquals(URI.create(newServiceUrl), resolver.resolveHostUri());
+    }
+
+    @Test
+    public void testMultipleHostsUrl() throws Exception {
+        String serviceUrl = "pulsar://host1:6650,host2:6650";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+        Set<URI> expectedHostUrls = new HashSet<>();
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host1", 
6650));
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host2", 
6650));
+        expectedHostUrls.add(URI.create("pulsar://host1:6650"));
+        expectedHostUrls.add(URI.create("pulsar://host2:6650"));
+
+        for (int i = 0; i < 10; i++) {
+            assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+            assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+        }
+    }
+
+    @Test
+    public void testMultipleHostsTlsUrl() throws Exception {
+        String serviceUrl = "pulsar+ssl://host1:6651,host2:6651";
+        resolver.updateServiceUrl(serviceUrl);
+        assertEquals(serviceUrl, resolver.getServiceUrl());
+        assertEquals(ServiceURI.create(serviceUrl), resolver.getServiceUri());
+
+        Set<InetSocketAddress> expectedAddresses = new HashSet<>();
+        Set<URI> expectedHostUrls = new HashSet<>();
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host1", 
6651));
+        expectedAddresses.add(InetSocketAddress.createUnresolved("host2", 
6651));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host1:6651"));
+        expectedHostUrls.add(URI.create("pulsar+ssl://host2:6651"));
+
+        for (int i = 0; i < 10; i++) {
+            assertTrue(expectedAddresses.contains(resolver.resolveHost()));
+            assertTrue(expectedHostUrls.contains(resolver.resolveHostUri()));
+        }
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
new file mode 100644
index 0000000..f562b7d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * ServiceURI represents service uri within pulsar cluster.
+ *
+ * <p>This file is based on
+ * {@link 
https://github.com/apache/bookkeeper/blob/master/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java}
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@EqualsAndHashCode
+public class ServiceURI {
+
+    private static final String BINARY_SERVICE = "pulsar";
+    private static final String HTTP_SERVICE = "http";
+    private static final String HTTPS_SERVICE = "https";
+    private static final String SSL_SERVICE = "ssl";
+
+    private static final int BINARY_PORT = 6650;
+    private static final int BINARY_TLS_PORT = 6651;
+    private static final int HTTP_PORT = 8080;
+    private static final int HTTPS_PORT = 8443;
+
+    /**
+     * Create a service uri instance from a uri string.
+     *
+     * @param uriStr service uri string
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates 
RFC&nbsp;2396
+     */
+    public static ServiceURI create(String uriStr) {
+        checkNotNull(uriStr, "service uri string is null");
+
+        // a service uri first should be a valid java.net.URI
+        URI uri = URI.create(uriStr);
+
+        return create(uri);
+    }
+
+    /**
+     * Create a service uri instance from a {@link URI} instance.
+     *
+     * @param uri {@link URI} instance
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates 
RFC&nbsp;2396
+     */
+    public static ServiceURI create(URI uri) {
+        checkNotNull(uri, "service uri instance is null");
+
+        String serviceName;
+        final String[] serviceInfos;
+        String scheme = uri.getScheme();
+        if (null != scheme) {
+            scheme = scheme.toLowerCase();
+            final String serviceSep = "+";
+            String[] schemeParts = StringUtils.split(scheme, serviceSep);
+            serviceName = schemeParts[0];
+            serviceInfos = new String[schemeParts.length - 1];
+            System.arraycopy(schemeParts, 1, serviceInfos, 0, 
serviceInfos.length);
+        } else {
+            serviceName = null;
+            serviceInfos = new String[0];
+        }
+
+        String userAndHostInformation = uri.getAuthority();
+        checkArgument(!Strings.isNullOrEmpty(userAndHostInformation),
+            "authority component is missing in service uri : " + uri);
+
+        String serviceUser;
+        List<String> serviceHosts;
+        int atIndex = userAndHostInformation.indexOf('@');
+        Splitter splitter = Splitter.on(CharMatcher.anyOf(",;"));
+        if (atIndex > 0) {
+            serviceUser = userAndHostInformation.substring(0, atIndex);
+            serviceHosts = 
splitter.splitToList(userAndHostInformation.substring(atIndex + 1));
+        } else {
+            serviceUser = null;
+            serviceHosts = splitter.splitToList(userAndHostInformation);
+        }
+        serviceHosts = serviceHosts
+            .stream()
+            .map(host -> validateHostName(serviceName, serviceInfos, host))
+            .collect(Collectors.toList());
+
+        String servicePath = uri.getPath();
+        checkArgument(null != servicePath,
+            "service path component is missing in service uri : " + uri);
+
+        return new ServiceURI(
+            serviceName,
+            serviceInfos,
+            serviceUser,
+            serviceHosts.toArray(new String[serviceHosts.size()]),
+            servicePath,
+            uri);
+    }
+
+    private static String validateHostName(String serviceName,
+                                           String[] serviceInfos,
+                                           String hostname) {
+        String[] parts = hostname.split(":");
+        if (parts.length >= 3) {
+            throw new IllegalArgumentException("Invalid hostname : " + 
hostname);
+        } else if (parts.length == 2) {
+            try {
+                Integer.parseUnsignedInt(parts[1]);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalArgumentException("Invalid hostname : " + 
hostname);
+            }
+            return hostname;
+        } else if (parts.length == 1) {
+            return hostname + ":" + getServicePort(serviceName, serviceInfos);
+        } else {
+            return hostname;
+        }
+    }
+
+    private final String serviceName;
+    private final String[] serviceInfos;
+    private final String serviceUser;
+    private final String[] serviceHosts;
+    private final String servicePath;
+    private final URI uri;
+
+    public String[] getServiceInfos() {
+        return serviceInfos;
+    }
+
+    public String[] getServiceHosts() {
+        return serviceHosts;
+    }
+
+    public String getServiceScheme() {
+        if (null == serviceName) {
+            return null;
+        } else {
+            if (serviceInfos.length == 0) {
+                return serviceName;
+            } else {
+                return serviceName + "+" + StringUtils.join(serviceInfos, '+');
+            }
+        }
+    }
+
+    private static int getServicePort(String serviceName, String[] 
serviceInfos) {
+        int port;
+        switch (serviceName.toLowerCase()) {
+            case BINARY_SERVICE:
+                if (serviceInfos.length == 0) {
+                    port = BINARY_PORT;
+                } else if (serviceInfos.length == 1 && 
serviceInfos[0].toLowerCase().equals(SSL_SERVICE)) {
+                    port = BINARY_TLS_PORT;
+                } else {
+                    throw new IllegalArgumentException("Invalid pulsar service 
: " + serviceName + "+" + serviceInfos);
+                }
+                break;
+            case HTTP_SERVICE:
+                port = HTTP_PORT;
+                break;
+            case HTTPS_SERVICE:
+                port = HTTPS_PORT;
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid pulsar service : " 
+ serviceName);
+        }
+        return port;
+    }
+
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java 
b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
new file mode 100644
index 0000000..99921c6
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.net;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import java.net.URI;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link ServiceURI}.
+ */
+public class ServiceURITest {
+
+    private static void assertServiceUri(
+        String serviceUri,
+        String expectedServiceName,
+        String[] expectedServiceInfo,
+        String expectedServiceUser,
+        String[] expectedServiceHosts,
+        String expectedServicePath) {
+
+        ServiceURI serviceURI = ServiceURI.create(serviceUri);
+
+        assertEquals(expectedServiceName, serviceURI.getServiceName());
+        assertArrayEquals(expectedServiceInfo, serviceURI.getServiceInfos());
+        assertEquals(expectedServiceUser, serviceURI.getServiceUser());
+        assertArrayEquals(expectedServiceHosts, serviceURI.getServiceHosts());
+        assertEquals(expectedServicePath, serviceURI.getServicePath());
+    }
+
+    @Test
+    public void testInvalidServiceUris() {
+        String[] uris = new String[] {
+            "://localhost:6650",                // missing scheme
+            "pulsar:///",                       // missing authority
+            "pulsar://localhost:6650:6651/",    // invalid hostname pair
+            "pulsar://localhost:xyz/",          // invalid port
+            "pulsar://localhost:-6650/",        // negative port
+        };
+
+        for (String uri : uris) {
+            testInvalidServiceUri(uri);
+        }
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullServiceUriString() {
+        ServiceURI.create((String) null);
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testNullServiceUriInstance() {
+        ServiceURI.create((URI) null);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testEmptyServiceUriString() {
+        ServiceURI.create("");
+    }
+
+    private void testInvalidServiceUri(String serviceUri) {
+        try {
+            ServiceURI.create(serviceUri);
+            fail("Should fail to parse service uri : " + serviceUri);
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMissingServiceName() {
+        String serviceUri = "//localhost:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            null, new String[0], null, new String[] { "localhost:6650" }, 
"/path/to/namespace");
+    }
+
+    @Test
+    public void testEmptyPath() {
+        String serviceUri = "pulsar://localhost:6650";
+        assertServiceUri(
+            serviceUri,
+            "pulsar", new String[0], null, new String[] { "localhost:6650" }, 
"");
+    }
+
+    @Test
+    public void testRootPath() {
+        String serviceUri = "pulsar://localhost:6650/";
+        assertServiceUri(
+            serviceUri,
+            "pulsar", new String[0], null, new String[] { "localhost:6650" }, 
"/");
+    }
+
+    @Test
+    public void testUserInfo() {
+        String serviceUri = 
"pulsar://pulsaruser@localhost:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            "pulsaruser",
+            new String[] { "localhost:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsSemiColon() {
+        String serviceUri = 
"pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsComma() {
+        String serviceUri = 
"pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPulsarPorts() {
+        String serviceUri = "pulsar://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPulsarTlsPorts() {
+        String serviceUri = "pulsar+ssl://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[] { "ssl" },
+            null,
+            new String[] { "host1:6651", "host2:6651", "host3:6651" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutHttpPorts() {
+        String serviceUri = "http://host1,host2,host3/path/to/namespace";;
+        assertServiceUri(
+            serviceUri,
+            "http",
+            new String[0],
+            null,
+            new String[] { "host1:8080", "host2:8080", "host3:8080" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutHttpsPorts() {
+        String serviceUri = "https://host1,host2,host3/path/to/namespace";;
+        assertServiceUri(
+            serviceUri,
+            "https",
+            new String[0],
+            null,
+            new String[] { "host1:8443", "host2:8443", "host3:8443" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixedPorts() {
+        String serviceUri = 
"pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6640", "host2:6650", "host3:6660" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixed() {
+        String serviceUri = 
"pulsar://host1:6640,host2,host3:6660/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            null,
+            new String[] { "host1:6640", "host2:6650", "host3:6660" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testUserInfoWithMultipleHosts() {
+        String serviceUri = 
"pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "pulsar",
+            new String[0],
+            "pulsaruser",
+            new String[] { "host1:6650", "host2:6650", "host3:6650" },
+            "/path/to/namespace");
+    }
+
+}

Reply via email to