This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e097f  [TABLE SERVICE] cleanup : provide unified service uri for 
resolving service endpoints
24e097f is described below

commit 24e097f8d518d574db7aefd04d2d2cbaba8ea425
Author: Sijie Guo <[email protected]>
AuthorDate: Thu May 31 23:24:02 2018 -0700

    [TABLE SERVICE] cleanup : provide unified service uri for resolving service 
endpoints
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Currently there are multiple settings in `StorageClientSettings` on 
configuring how to resolve
    service endpoints to find the services. It is a bit inconvinient and 
confusing
    
    *Solution*
    
    Provide a class `ServiceURI` for parsing service uri to retrieve common 
informations including:
    
    - serviceName
    - serviceInfos
    - serviceUser
    - serviceHosts
    - servicePath
    
    It extends the java.net.URI and does parsing and validation in the same 
place. Then it can be used
    by the grpc name resolver to resolve service endpoints.
    
    Also update dlog to use it for resolving its namespace uri.
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1459 from sijie/name_resolver
---
 .../apache/bookkeeper/common/net/ServiceURI.java   | 235 +++++++++++++++++++++
 .../apache/bookkeeper/common/net/package-info.java |  22 +-
 .../bookkeeper/common/net/ServiceURITest.java      | 221 +++++++++++++++++++
 .../bookkeeper/stream/cli/StreamStorageCli.java    |  13 +-
 .../clients/TestStorageClientBuilder.java          |   4 +-
 .../clients/config/StorageClientSettings.java      |  40 +---
 .../clients/impl/internal/LocationClientImpl.java  |  24 +--
 .../resolver/SimpleStreamResolverFactory.java      |  90 --------
 .../bookkeeper/clients/utils/GrpcChannels.java     |  64 ++++++
 .../clients/config/TestStorageClientSettings.java  |  15 +-
 .../clients/grpc/GrpcClientTestBase.java           |   6 +-
 .../impl/internal/RootRangeClientImplTestBase.java |   7 +-
 .../impl/internal/TestLocationClientImpl.java      |   4 +-
 .../impl/internal/TestMetaRangeClientImpl.java     |   5 +-
 .../resolver/ServiceNameResolverProvider.java      | 164 ++++++++++++++
 ...leNameResolver.java => StaticNameResolver.java} |   4 +-
 .../common/reslover/TestSimpleNameResolver.java    |   6 +-
 .../org/apache/distributedlog/util/DLUtils.java    |  17 +-
 .../namespace/TestNamespaceBuilder.java            |   3 -
 .../bookkeeper/stream/cluster/StreamCluster.java   |  12 +-
 .../integration/stream/StreamClusterTestBase.java  |   7 +-
 21 files changed, 750 insertions(+), 213 deletions(-)

diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java
new file mode 100644
index 0000000..289c3f3
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/ServiceURI.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.net;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.CharMatcher;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.net.URI;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * ServiceURI represents service uri within bookkeeper cluster.
+ *
+ * <h3>Service URI syntax and components</h3>
+ *
+ * <p>At the highest level a service uri is a {@link java.net.URI} in string
+ * form has the syntax.
+ *
+ * <blockquote>
+ * [<i>service</i>[<i>service-specific-part</i>]<b>{@code :}</b>[<b>{@code 
//}</b><i>authority</i>][<i>path</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code :}</b> and <b>{@code /}</b> stand for 
themselves.
+ *
+ * <p>The service-specific-part of a service URI consists of the backend 
information used for services to use.
+ * It has the syntax as below:
+ *
+ * <blockquote>
+ * [({@code -}|{@code +})][<i>backend-part</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code -}</b> and <b>{@code +}</b> stand as a 
separator to separate service type
+ * from service backend information.
+ *
+ * <p>The authority component of a service URI has the same meaning as the 
authority component
+ * in a {@link java.net.URI}. If specified, it should be <i>server-based</i>. 
A server-based
+ * authority parses according to the familiar syntax
+ *
+ * <blockquote>
+ * [<i>user-info</i><b>{@code @}</b>]<i>host</i>[<b>{@code :}</b><i>port</i>]
+ * </blockquote>
+ *
+ * <p>where the characters <b>{@code @}</b> and <b>{@code :}</b> stand for 
themselves.
+ *
+ * <p>The path component of a service URI is itself said to be absolute. It 
typically means which path a service
+ * stores metadata or data.
+ *
+ * <p>All told, then, a service URI instance has the following components:
+ *
+ * <blockquote>
+ * <table summary="Describes the components of a service
+ * URI:service,service-specific-part,authority,user-info,host,port,path">
+ * <tr><td>service</td><td>{@code String}</td></tr>
+ * <tr><td>service-specific-part&nbsp;&nbsp;&nbsp;&nbsp;</td><td>{@code 
String}</td></tr>
+ * <tr><td>authority</td><td>{@code String}</td></tr>
+ * <tr><td>user-info</td><td>{@code String}</td></tr>
+ * <tr><td>host</td><td>{@code String}</td></tr>
+ * <tr><td>port</td><td>{@code int}</td></tr>
+ * <tr><td>path</td><td>{@code String}</td></tr>
+ * </table>
+ * </blockquote>
+ *
+ * <p>Some examples of service URIs are:
+ *
+ * <blockquote>
+ * {@code zk://localhost:2181/cluster1/ledgers} =&gt; ledger service uri using 
default ledger manager<br>
+ * {@code zk+hierarchical://localhost:2181/ledgers} =&gt; ledger service uri 
using hierarchical ledger manager<br>
+ * {@code etcd://localhost/ledgers} =&gt; ledger service uri using etcd as 
metadata store<br>
+ * {@code distributedlog://localhost:2181/distributedlog} =&gt; distributedlog 
namespace<br>
+ * {@code distributedlog-bk://localhost:2181/distributedlog} =&gt; 
distributedlog namespace with bk backend<br>
+ * {@code bk://bookkeeper-cluster/} =&gt; stream storage service uri <br>
+ * {@code host1:port,host2:port} =&gt; a list of hosts as bootstrap hosts to a 
stream storage cluster}
+ * </blockquote>
+ *
+ * @since 4.8.0
+ */
+@Public
+@Evolving
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@EqualsAndHashCode
+public class ServiceURI {
+
+    /**
+     * Service string for ledger service that uses zookeeper as metadata store.
+     */
+    public static final String SERVICE_ZK   = "zk";
+
+    /**
+     * Service string for dlog service.
+     */
+    public static final String SERVICE_DLOG = "distributedlog";
+
+    /**
+     * Service string for bookkeeper service.
+     */
+    public static final String SERVICE_BK = "bk";
+
+    private static final String SERVICE_SEP = "+";
+    private static final String SERVICE_DLOG_SEP = "-";
+
+    /**
+     * Create a service uri instance from a uri string.
+     *
+     * @param uriStr service uri string
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates 
RFC&nbsp;2396
+     */
+    public static ServiceURI create(String uriStr) {
+        checkNotNull(uriStr, "service uri string is null");
+
+        // a service uri first should be a valid java.net.URI
+        URI uri = URI.create(uriStr);
+
+        return create(uri);
+    }
+
+    /**
+     * Create a service uri instance from a {@link URI} instance.
+     *
+     * @param uri {@link URI} instance
+     * @return a service uri instance
+     * @throws NullPointerException if {@code uriStr} is null
+     * @throws IllegalArgumentException if the given string violates 
RFC&nbsp;2396
+     */
+    public static ServiceURI create(URI uri) {
+        checkNotNull(uri, "service uri instance is null");
+
+        String serviceName = null;
+        String[] serviceInfos = new String[0];
+        String scheme = uri.getScheme();
+        if (null != scheme) {
+            scheme = scheme.toLowerCase();
+            final String serviceSep;
+            if (scheme.startsWith(SERVICE_DLOG)) {
+                serviceSep = SERVICE_DLOG_SEP;
+            } else {
+                serviceSep = SERVICE_SEP;
+            }
+            String[] schemeParts = StringUtils.split(scheme, serviceSep);
+            serviceName = schemeParts[0];
+            serviceInfos = new String[schemeParts.length - 1];
+            System.arraycopy(schemeParts, 1, serviceInfos, 0, 
serviceInfos.length);
+        }
+
+        String userAndHostInformation = uri.getAuthority();
+        checkArgument(!Strings.isNullOrEmpty(userAndHostInformation),
+            "authority component is missing in service uri : " + uri);
+
+        String serviceUser;
+        List<String> serviceHosts;
+        int atIndex = userAndHostInformation.indexOf('@');
+        Splitter splitter = Splitter.on(CharMatcher.anyOf(",;"));
+        if (atIndex > 0) {
+            serviceUser = userAndHostInformation.substring(0, atIndex);
+            serviceHosts = 
splitter.splitToList(userAndHostInformation.substring(atIndex + 1));
+        } else {
+            serviceUser = null;
+            serviceHosts = splitter.splitToList(userAndHostInformation);
+        }
+        serviceHosts.forEach(host -> validateHostName(host));
+
+        String servicePath = uri.getPath();
+        checkArgument(null != servicePath,
+            "service path component is missing in service uri : " + uri);
+
+        return new ServiceURI(
+            serviceName,
+            serviceInfos,
+            serviceUser,
+            serviceHosts.toArray(new String[serviceHosts.size()]),
+            servicePath,
+            uri);
+    }
+
+    private static void validateHostName(String hostname) {
+        String[] parts = hostname.split(":");
+        if (parts.length >= 3) {
+            throw new IllegalArgumentException("Invalid hostname : " + 
hostname);
+        } else if (parts.length == 2) {
+            try {
+                Integer.parseUnsignedInt(parts[1]);
+            } catch (NumberFormatException nfe) {
+                throw new IllegalArgumentException("Invalid hostname : " + 
hostname);
+            }
+        }
+
+    }
+
+    private final String serviceName;
+    private final String[] serviceInfos;
+    private final String serviceUser;
+    private final String[] serviceHosts;
+    private final String servicePath;
+    private final URI uri;
+
+    @SuppressFBWarnings("EI_EXPOSE_REP")
+    public String[] getServiceInfos() {
+        return serviceInfos;
+    }
+
+    @SuppressFBWarnings("EI_EXPOSE_REP")
+    public String[] getServiceHosts() {
+        return serviceHosts;
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
similarity index 57%
rename from 
stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java
rename to 
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
index 725b5f7..cb0b13a 100644
--- 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/AbstractNameResolverFactory.java
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/net/package-info.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,23 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.common.resolver;
-
-import io.grpc.NameResolver;
-
 /**
- * The abstract bookkeeper name resolver factory.
- *
- * <p>The name resolver is responsible for creating specific name resolver
- * which provides addresses for {@link io.grpc.LoadBalancer}.
+ * Classes for resolving service uris in bookkeeper.
  */
-public abstract class AbstractNameResolverFactory extends NameResolver.Factory 
{
-
-    /**
-     * Gets name of the name resolver factory.
-     *
-     * @return name of the name resolver factory.
-     */
-    public abstract String name();
-
-}
+package org.apache.bookkeeper.common.net;
\ No newline at end of file
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java
new file mode 100644
index 0000000..9982c43
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/net/ServiceURITest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.net;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URI;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ServiceURI}.
+ */
+public class ServiceURITest {
+
+    private static void assertServiceUri(
+        String serviceUri,
+        String expectedServiceName,
+        String[] expectedServiceInfo,
+        String expectedServiceUser,
+        String[] expectedServiceHosts,
+        String expectedServicePath) {
+
+        ServiceURI serviceURI = ServiceURI.create(serviceUri);
+
+        assertEquals(expectedServiceName, serviceURI.getServiceName());
+        assertArrayEquals(expectedServiceInfo, serviceURI.getServiceInfos());
+        assertEquals(expectedServiceUser, serviceURI.getServiceUser());
+        assertArrayEquals(expectedServiceHosts, serviceURI.getServiceHosts());
+        assertEquals(expectedServicePath, serviceURI.getServicePath());
+    }
+
+    @Test
+    public void testInvalidServiceUris() {
+        String[] uris = new String[] {
+            "://localhost:2181/path/to/namespace",          // missing scheme
+            "bk:///path/to/namespace",                      // missing 
authority
+            "bk://localhost:2181:3181/path/to/namespace",   // invalid 
hostname pair
+            "bk://localhost:xyz/path/to/namespace",         // invalid port
+            "bk://localhost:-2181/path/to/namespace",       // negative port
+        };
+
+        for (String uri : uris) {
+            testInvalidServiceUri(uri);
+        }
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullServiceUriString() {
+        ServiceURI.create((String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testNullServiceUriInstance() {
+        ServiceURI.create((URI) null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testEmptyServiceUriString() {
+        ServiceURI.create("");
+    }
+
+    private void testInvalidServiceUri(String serviceUri) {
+        try {
+            ServiceURI.create(serviceUri);
+            fail("Should fail to parse service uri : " + serviceUri);
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMissingServiceName() {
+        String serviceUri = "//localhost:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            null, new String[0], null, new String[] { "localhost:2181" }, 
"/path/to/namespace");
+    }
+
+    @Test
+    public void testEmptyPath() {
+        String serviceUri = "bk://localhost:2181";
+        assertServiceUri(
+            serviceUri,
+            "bk", new String[0], null, new String[] { "localhost:2181" }, "");
+    }
+
+    @Test
+    public void testRootPath() {
+        String serviceUri = "bk://localhost:2181/";
+        assertServiceUri(
+            serviceUri,
+            "bk", new String[0], null, new String[] { "localhost:2181" }, "/");
+    }
+
+    @Test
+    public void testUserInfo() {
+        String serviceUri = "bk://bookkeeper@localhost:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            "bookkeeper",
+            new String[] { "localhost:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsSemiColon() {
+        String serviceUri = 
"bk://host1:2181;host2:2181;host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsComma() {
+        String serviceUri = 
"bk://host1:2181,host2:2181,host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsWithoutPorts() {
+        String serviceUri = "bk://host1,host2,host3/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1", "host2", "host3" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testMultipleHostsMixed() {
+        String serviceUri = 
"bk://host1:2181,host2,host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            null,
+            new String[] { "host1:2181", "host2", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testUserInfoWithMultipleHosts() {
+        String serviceUri = 
"bk://bookkeeper@host1:2181;host2:2181;host3:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[0],
+            "bookkeeper",
+            new String[] { "host1:2181", "host2:2181", "host3:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoPlus() {
+        String serviceUri = "bk+ssl://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk",
+            new String[] { "ssl" },
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoMinus() {
+        String serviceUri = "bk-ssl://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "bk-ssl",
+            new String[0],
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+    @Test
+    public void testServiceInfoDlogMinus() {
+        String serviceUri = "distributedlog-bk://host:2181/path/to/namespace";
+        assertServiceUri(
+            serviceUri,
+            "distributedlog",
+            new String[] { "bk" },
+            null,
+            new String[] { "host:2181" },
+            "/path/to/namespace");
+    }
+
+}
diff --git 
a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
 
b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
index f0e7974..b1ac239 100644
--- 
a/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
+++ 
b/stream/cli/src/main/java/org/apache/bookkeeper/stream/cli/StreamStorageCli.java
@@ -28,12 +28,10 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.cli.commands.CmdBase;
 import org.apache.bookkeeper.stream.cli.commands.CmdNamespace;
 import org.apache.bookkeeper.stream.cli.commands.CmdStream;
 import org.apache.bookkeeper.stream.cli.commands.CmdTable;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 
 /**
  * Bookie Shell.
@@ -83,8 +81,8 @@ public class StreamStorageCli {
     @Getter(AccessLevel.PACKAGE)
     static class ShellArguments {
 
-        @Parameter(names = { "-s", "--server" }, description = "A storage 
server address")
-        private String endpoint = null;
+        @Parameter(names = { "-u", "--server-uri" }, description = "The 
bookkeeper service uri")
+        private String serviceUri = "bk://localhost:4181";
 
         @Parameter(names = { "-n", "--namespace" }, description = "Namespace")
         private String namespace = "default";
@@ -155,16 +153,15 @@ public class StreamStorageCli {
             return false;
         }
 
-        if (null == shellArgs.endpoint) {
+        if (null == shellArgs.serviceUri) {
             System.err.println("No endpoint is provided");
             commander.usage();
             return false;
         }
 
-        Endpoint endpoint = NetUtils.parseEndpoint(shellArgs.endpoint);
-        settingsBuilder.addEndpoints(endpoint);
+        settingsBuilder.serviceUri(shellArgs.serviceUri);
 
-        log.info("connecting to storage service = {}", endpoint);
+        log.info("connecting to storage service = {}", shellArgs.serviceUri);
 
         if (cmdPos == args.length) {
             commander.usage();
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
index c80010a..39bd14a 100644
--- 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/TestStorageClientBuilder.java
@@ -47,7 +47,9 @@ public class TestStorageClientBuilder {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildClientInvalidNamespaceName() {
         StorageClientBuilder.newBuilder()
-            .withSettings(mock(StorageClientSettings.class))
+            .withSettings(StorageClientSettings.newBuilder()
+                .serviceUri("bk://localhost:4181")
+                .build())
             .withNamespace("invalid-namespace")
             .build();
     }
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 87768fa..9a0fc6d 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -18,16 +18,11 @@
 
 package org.apache.bookkeeper.clients.config;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.NameResolver;
-import java.util.List;
 import java.util.Optional;
 import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.common.util.Backoff;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.inferred.freebuilder.FreeBuilder;
 
 /**
@@ -44,18 +39,11 @@ public interface StorageClientSettings {
     int numWorkerThreads();
 
     /**
-     * Returns the name resolver factory used by zstream client.
-     *
-     * @return name resolver factory.
-     */
-    Optional<NameResolver.Factory> nameResolverFactory();
-
-    /**
-     * Returns the endpoints used by the client builder.
+     * Returns the service uri that storage client should talk to.
      *
-     * @return the list of endpoints.
+     * @return service uri
      */
-    List<Endpoint> endpoints();
+    String serviceUri();
 
     /**
      * Return the endpoint resolver for resolving individual endpoints.
@@ -67,13 +55,6 @@ public interface StorageClientSettings {
     EndpointResolver endpointResolver();
 
     /**
-     * Returns the builder to create the managed channel.
-     *
-     * @return
-     */
-    Optional<ManagedChannelBuilder> managedChannelBuilder();
-
-    /**
      * Use of a plaintext connection to the server. By default a secure 
connection mechanism
      * such as TLS will be used.
      *
@@ -114,14 +95,13 @@ public interface StorageClientSettings {
 
         @Override
         public StorageClientSettings build() {
-            checkArgument(
-                nameResolverFactory().isPresent()
-                    || !endpoints().isEmpty()
-                    || managedChannelBuilder().isPresent(),
-                "No name resolver or endpoints or channel builder provided");
-            return super.build();
-        }
+            StorageClientSettings settings = super.build();
+
+            // create a service uri to ensure the service uri is valid
+            ServiceURI.create(serviceUri());
 
+            return settings;
+        }
     }
 
     static Builder newBuilder() {
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
index db1c656..8b589c3 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
@@ -24,8 +24,6 @@ import static 
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetSto
 
 import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.NameResolver;
 import io.grpc.Status;
 import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
@@ -36,8 +34,8 @@ import java.util.function.Predicate;
 import java.util.stream.Stream;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.impl.internal.api.LocationClient;
-import org.apache.bookkeeper.clients.resolver.SimpleStreamResolverFactory;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.clients.utils.GrpcChannels;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -63,28 +61,14 @@ public class LocationClientImpl implements LocationClient {
                               OrderedScheduler scheduler) {
         this.settings = settings;
         this.scheduler = scheduler;
-        ManagedChannelBuilder builder = 
settings.managedChannelBuilder().orElse(
-            ManagedChannelBuilder
-                .forTarget("stream")
-                .nameResolverFactory(getResolver(settings))
-        );
-        if (settings.usePlaintext()) {
-            builder = builder.usePlaintext(true);
-        }
-        this.channel = builder.build();
+        this.channel = GrpcChannels.createChannelBuilder(
+            settings.serviceUri(), settings
+        ).build();
         this.locationService = GrpcUtils.configureGrpcStub(
             StorageContainerServiceGrpc.newFutureStub(channel),
             Optional.empty());
     }
 
-    private NameResolver.Factory getResolver(StorageClientSettings settings) {
-        if (settings.nameResolverFactory().isPresent()) {
-            return settings.nameResolverFactory().get();
-        } else {
-            return SimpleStreamResolverFactory.of(settings.endpoints());
-        }
-    }
-
     private Stream<Long> getDefaultBackoffs() {
         return Backoff.exponential(
             ClientConstants.DEFAULT_BACKOFF_START_MS,
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java
deleted file mode 100644
index 512ee06..0000000
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/SimpleStreamResolverFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.clients.resolver;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import io.grpc.Attributes;
-import io.grpc.NameResolver;
-import java.net.URI;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.bookkeeper.clients.utils.ClientResources;
-import org.apache.bookkeeper.common.resolver.AbstractNameResolverFactory;
-import org.apache.bookkeeper.common.resolver.SimpleNameResolver;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-
-/**
- * A simple resolver factory that creates simple name resolver.
- */
-public class SimpleStreamResolverFactory extends AbstractNameResolverFactory {
-
-    private static final String SCHEME = "stream";
-    private static final String NAME = "simple";
-
-    public static final SimpleStreamResolverFactory of(List<Endpoint> 
endpoints) {
-        return new SimpleStreamResolverFactory(endpoints);
-    }
-
-    public static final SimpleStreamResolverFactory of(Endpoint... endpoints) {
-        return new SimpleStreamResolverFactory(Lists.newArrayList(endpoints));
-    }
-
-    private final List<URI> endpointURIs;
-
-    private SimpleStreamResolverFactory(List<Endpoint> endpoints) {
-        this.endpointURIs = Lists.transform(
-            endpoints,
-            new Function<Endpoint, URI>() {
-                @Override
-                public URI apply(Endpoint endpoint) {
-                    return URI.create(SCHEME + "://" + endpoint.getHostname() 
+ ":" + endpoint.getPort());
-                }
-            }
-        );
-    }
-
-    @Override
-    public String name() {
-        return NAME;
-    }
-
-    @Nullable
-    @Override
-    public NameResolver newNameResolver(URI targetUri, Attributes params) {
-        if (!Objects.equal(SCHEME, targetUri.getScheme())) {
-            return null;
-        }
-        String targetPath = checkNotNull(targetUri.getPath());
-        checkArgument(targetPath.startsWith("/"),
-            "the path component (%s) of the target (%s) must start with '/'",
-            targetPath, targetUri);
-        String name = targetPath.substring(1);
-        return new SimpleNameResolver(name, 
ClientResources.shared().executor(), this.endpointURIs);
-    }
-
-    @Override
-    public String getDefaultScheme() {
-        return SCHEME;
-    }
-}
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
new file mode 100644
index 0000000..8588359
--- /dev/null
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.utils;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.ServiceNameResolverProvider;
+
+/**
+ * Utils to create grpc channels.
+ */
+@Slf4j
+public final class GrpcChannels {
+
+    private static final String BACKEND_INPROCESS = "inprocess";
+
+    private GrpcChannels() {}
+
+    /**
+     * Create a channel builder from <tt>serviceUri</tt> with client 
<tt>settings</tt>.
+     *
+     * @param serviceUri service uri
+     * @param settings client settings
+     * @return managed channel builder
+     */
+    public static ManagedChannelBuilder createChannelBuilder(String serviceUri,
+                                                             
StorageClientSettings settings) {
+        ServiceURI uri = ServiceURI.create(serviceUri);
+
+        ManagedChannelBuilder builder;
+        if (uri.getServiceInfos().length > 0 && 
uri.getServiceInfos()[0].equals(BACKEND_INPROCESS)) {
+            // this is an inprocess service, so build an inprocess channel.
+            String serviceName = uri.getServiceHosts()[0];
+            builder = 
InProcessChannelBuilder.forName(serviceName).directExecutor();
+        } else {
+            builder = ManagedChannelBuilder.forTarget(serviceUri)
+                .nameResolverFactory(new 
ServiceNameResolverProvider().toFactory());
+        }
+        if (settings.usePlaintext()) {
+            builder = builder.usePlaintext();
+        }
+        return builder;
+    }
+
+}
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
index ae08c83..a4e4294 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/config/TestStorageClientSettings.java
@@ -23,9 +23,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import com.google.common.collect.Lists;
-import java.util.List;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.junit.Test;
 
 /**
@@ -35,14 +32,10 @@ public class TestStorageClientSettings {
 
     @Test
     public void testDefault() {
-        List<Endpoint> endpoints = Lists.newArrayList(
-            Endpoint.newBuilder()
-                .setHostname("127.0.0.1")
-                .setPort(80)
-                .build());
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addAllEndpoints(endpoints)
+            .serviceUri("bk://127.0.0.1:4181/")
             .build();
+        assertEquals("bk://127.0.0.1:4181/", settings.serviceUri());
         assertEquals(Runtime.getRuntime().availableProcessors(), 
settings.numWorkerThreads());
         assertTrue(settings.usePlaintext());
         assertFalse(settings.clientName().isPresent());
@@ -53,8 +46,8 @@ public class TestStorageClientSettings {
         try {
             StorageClientSettings.newBuilder().build();
             fail("Should fail with missing endpoints");
-        } catch (IllegalArgumentException iae) {
-            assertEquals("No name resolver or endpoints or channel builder 
provided", iae.getMessage());
+        } catch (IllegalStateException iae) {
+            assertEquals("Not set: [serviceUri]", iae.getMessage());
         }
     }
 
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
index 9088fde..f90a9aa 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/grpc/GrpcClientTestBase.java
@@ -50,7 +50,7 @@ public abstract class GrpcClientTestBase {
         .setPort(4181)
         .build();
 
-    protected String serverName = "fake server for " + getClass();
+    protected String serverName;
     protected final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
     protected Server fakeServer;
     protected OrderedScheduler scheduler;
@@ -61,6 +61,7 @@ public abstract class GrpcClientTestBase {
 
     @Before
     public void setUp() throws Exception {
+        serverName = "fake-server";
         fakeServer = InProcessServerBuilder
             .forName(serverName)
             .fallbackHandlerRegistry(serviceRegistry)
@@ -72,8 +73,7 @@ public abstract class GrpcClientTestBase {
             .numThreads(Runtime.getRuntime().availableProcessors())
             .build();
         settings = StorageClientSettings.newBuilder()
-            
.managedChannelBuilder(InProcessChannelBuilder.forName(serverName).directExecutor())
-            .usePlaintext(true)
+            .serviceUri("bk+inprocess://" + serverName)
             .build();
         serverManager = new StorageServerClientManagerImpl(
             settings,
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
index a962a21..c61052c 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplTestBase.java
@@ -27,6 +27,7 @@ import io.grpc.inprocess.InProcessChannelBuilder;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import org.apache.bookkeeper.clients.exceptions.ClientException;
 import org.apache.bookkeeper.clients.exceptions.InvalidNamespaceNameException;
 import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
@@ -118,7 +119,7 @@ public abstract class RootRangeClientImplTestBase extends 
GrpcClientTestBase {
 
         RootRangeServiceImplBase rootRangeService = 
createRootRangeServiceForSuccess();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -138,7 +139,7 @@ public abstract class RootRangeClientImplTestBase extends 
GrpcClientTestBase {
 
         RootRangeServiceImplBase rootRangeService = 
createRootRangeServiceForRequestFailure();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -158,7 +159,7 @@ public abstract class RootRangeClientImplTestBase extends 
GrpcClientTestBase {
 
         RootRangeServiceImplBase rootRangeService = 
createRootRangeServiceForRpcFailure();
         serviceRegistry.addService(rootRangeService.bindService());
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
index c2d4fc2..665b8ac 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestLocationClientImpl.java
@@ -30,7 +30,6 @@ import io.grpc.ServerServiceDefinition;
 import io.grpc.Status;
 import io.grpc.StatusException;
 import io.grpc.StatusRuntimeException;
-import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -129,8 +128,7 @@ public class TestLocationClientImpl extends 
GrpcClientTestBase {
     protected void doSetup() throws Exception {
         StorageClientSettings settings =
             StorageClientSettings.newBuilder()
-                
.managedChannelBuilder(InProcessChannelBuilder.forName(serverName).directExecutor())
-                .usePlaintext(true)
+                .serviceUri("bk+inprocess://" + serverName)
                 .build();
         locationClient = new LocationClientImpl(settings, scheduler);
         locationServiceDefinition = locationService.bindService();
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
index 9716162..985b955 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import lombok.Cleanup;
 import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
 import org.apache.bookkeeper.clients.impl.channel.StorageServerChannelManager;
@@ -145,7 +146,7 @@ public class TestMetaRangeClientImpl extends 
GrpcClientTestBase {
         };
         serviceRegistry.addService(metaRangeService.bindService());
 
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
@@ -169,7 +170,7 @@ public class TestMetaRangeClientImpl extends 
GrpcClientTestBase {
         };
         serviceRegistry.addService(metaRangeService.bindService());
 
-        StorageServerChannel rsChannel = new StorageServerChannel(
+        @Cleanup StorageServerChannel rsChannel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty());
         serviceFuture.complete(rsChannel);
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
new file mode 100644
index 0000000..9e11adf
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import io.grpc.internal.DnsNameResolverProvider;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+
+/**
+ * An implementation of {@link NameResolverProvider} that provides {@link 
NameResolver}s
+ * to resolve {@link org.apache.bookkeeper.common.net.ServiceURI}.
+ */
+@Slf4j
+public final class ServiceNameResolverProvider extends NameResolverProvider {
+
+    private final DnsNameResolverProvider dnsProvider;
+    private final Resource<ExecutorService> executorResource;
+
+    public ServiceNameResolverProvider() {
+        this.dnsProvider = new DnsNameResolverProvider();
+        this.executorResource = new Resource<ExecutorService>() {
+            @Override
+            public ExecutorService create() {
+                return Executors.newSingleThreadScheduledExecutor();
+            }
+
+            @Override
+            public void close(ExecutorService instance) {
+                instance.shutdown();
+            }
+        };
+    }
+
+    @Override
+    protected boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    protected int priority() {
+        return 10;
+    }
+
+    @Nullable
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        ServiceURI serviceURI;
+        try {
+            serviceURI = ServiceURI.create(targetUri);
+        } catch (NullPointerException | IllegalArgumentException e) {
+            // invalid uri here, so return null to allow grpc to use other 
name resolvers
+            log.info("ServiceNameResolverProvider doesn't know how to resolve 
{} : cause {}",
+                targetUri, e.getMessage());
+            return null;
+        }
+
+        if (null == serviceURI.getServiceName()
+            || ServiceURI.SERVICE_BK.equals(serviceURI.getServiceName())) {
+
+            String[] hosts = serviceURI.getServiceHosts();
+            if (hosts.length == 0) {
+                // no host is find, so return null to let grpc choose other 
resolver.
+                return null;
+            } else if (hosts.length == 1) {
+                // create a dns name resolver
+                URI dnsUri = URI.create("dns:///" + hosts[0]);
+                return dnsProvider.newNameResolver(dnsUri, params);
+            } else {
+                // create a static resolver taking the list of servers.
+                List<String> hostList = new ArrayList<>();
+                for (String host : hosts) {
+                    hostList.add(host);
+                }
+                List<URI> hostUris = Lists.transform(
+                    hostList,
+                    new Function<String, URI>() {
+                        @Nullable
+                        @Override
+                        public URI apply(@Nullable String host) {
+                            return URI.create("//" + host);
+                        }
+                    }
+                );
+
+                return new StaticNameResolver(
+                    "static",
+                    executorResource,
+                    hostUris);
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        return ServiceURI.SERVICE_BK;
+    }
+
+    public NameResolver.Factory toFactory() {
+        return new NameResolverFactory(Lists.newArrayList(this));
+    }
+
+    private static class NameResolverFactory extends NameResolver.Factory {
+        private final List<NameResolverProvider> providers;
+
+        public NameResolverFactory(List<NameResolverProvider> providers) {
+            this.providers = providers;
+        }
+
+        @Override
+        public NameResolver newNameResolver(URI targetUri, Attributes params) {
+            checkForProviders();
+            for (NameResolverProvider provider : providers) {
+                NameResolver resolver = provider.newNameResolver(targetUri, 
params);
+                if (resolver != null) {
+                    return resolver;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public String getDefaultScheme() {
+            checkForProviders();
+            return providers.get(0).getDefaultScheme();
+        }
+
+        private void checkForProviders() {
+            checkState(!providers.isEmpty(),
+                "No NameResolverProviders found. Please check your 
configuration");
+        }
+    }
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
similarity index 94%
rename from 
stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java
rename to 
stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
index 7ff4e89..3f24ab9 100644
--- 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/SimpleNameResolver.java
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/StaticNameResolver.java
@@ -32,11 +32,11 @@ import 
org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 /**
  * A simple name resolver that returns pre-configured host addresses.
  */
-public class SimpleNameResolver extends AbstractNameResolver {
+public class StaticNameResolver extends AbstractNameResolver {
 
     private final List<EquivalentAddressGroup> servers;
 
-    public SimpleNameResolver(String name,
+    public StaticNameResolver(String name,
                               Resource<ExecutorService> executorResource,
                               List<URI> endpoints) {
         super(name, executorResource);
diff --git 
a/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
 
b/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
index 58a0971..a781486 100644
--- 
a/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
+++ 
b/stream/common/src/test/java/org/apache/bookkeeper/common/reslover/TestSimpleNameResolver.java
@@ -28,12 +28,12 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.List;
 import java.util.stream.Collectors;
-import org.apache.bookkeeper.common.resolver.SimpleNameResolver;
+import org.apache.bookkeeper.common.resolver.StaticNameResolver;
 import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.junit.Test;
 
 /**
- * Unit test of {@link SimpleNameResolver}.
+ * Unit test of {@link StaticNameResolver}.
  */
 public class TestSimpleNameResolver {
 
@@ -61,7 +61,7 @@ public class TestSimpleNameResolver {
             .collect(Collectors.toList());
 
         @SuppressWarnings("unchecked") // for the mock
-            SimpleNameResolver nameResolver = new SimpleNameResolver(
+            StaticNameResolver nameResolver = new StaticNameResolver(
             "test-name-resolver",
             mock(Resource.class),
             uris);
diff --git 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
index c89db47..635f86f 100644
--- 
a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
+++ 
b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/DLUtils.java
@@ -26,16 +26,12 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
-import org.apache.commons.lang.StringUtils;
+import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.distributedlog.DistributedLogConstants;
 import org.apache.distributedlog.LogSegmentMetadata;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
 import org.apache.distributedlog.exceptions.UnexpectedException;
 
-
-
-
-
 /**
  * Utilities about DL implementations like uri, log segments, metadata 
serialization and deserialization.
  */
@@ -228,17 +224,14 @@ public class DLUtils {
      * @return the normalized uri
      */
     public static URI normalizeURI(URI uri) {
-        checkNotNull(uri, "DistributedLog uri is null");
-        String scheme = uri.getScheme();
-        checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
-        scheme = scheme.toLowerCase();
-        String[] schemeParts = StringUtils.split(scheme, '-');
-        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, 
schemeParts[0].toLowerCase()),
+        ServiceURI serviceURI = ServiceURI.create(uri);
+        checkNotNull(serviceURI.getServiceName(), "Invalid distributedlog uri 
: " + uri);
+        checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, 
serviceURI.getServiceName()),
                 "Unknown distributedlog scheme found : " + uri);
         URI normalizedUri;
         try {
             normalizedUri = new URI(
-                    schemeParts[0],     // remove backend info
+                    serviceURI.getServiceName(),     // remove backend info
                     uri.getAuthority(),
                     uri.getPath(),
                     uri.getQuery(),
diff --git 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
index 7ee7cb3..b905c41 100644
--- 
a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
+++ 
b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
@@ -28,9 +28,6 @@ import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.junit.Test;
 
-
-
-
 /**
  * Test Namespace Builder.
  */
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index ff0cda2..e5a6df4 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -29,11 +29,13 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
+import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -226,12 +228,18 @@ public class StreamCluster
         executor.shutdown();
     }
 
+
     private void createDefaultNamespaces() throws Exception {
+        String serviceUri = String.format(
+            "bk://%s/",
+            getRpcEndpoints().stream()
+                .map(endpoint -> NetUtils.endpointToString(endpoint))
+                .collect(Collectors.joining(",")));
         StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getRpcEndpoints().toArray(new 
Endpoint[getRpcEndpoints().size()]))
+            .serviceUri(serviceUri)
             .usePlaintext(true)
             .build();
-        log.info("RpcEndpoints are : {}", settings.endpoints());
+        log.info("Service uri are : {}", serviceUri);
         String namespaceName = "default";
         try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
             .withSettings(settings)
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index fe7c914..a7d8549 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -78,8 +78,13 @@ public abstract class StreamClusterTestBase extends 
BookKeeperClusterTestBase {
     //
 
     protected static StorageClientSettings newStorageClientSettings() {
+        String serviceUri = String.format(
+            "bk://%s/",
+            getExsternalStreamEndpoints().stream()
+                .map(endpoint -> NetUtils.endpointToString(endpoint))
+                .collect(Collectors.joining(",")));
         return StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new 
Endpoint[getNumBookies()]))
+            .serviceUri(serviceUri)
             .endpointResolver(endpoint -> {
                 String internalEndpointStr = 
NetUtils.endpointToString(endpoint);
                 String externalEndpointStr =

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to