This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f706f  [TABLE SERVICE] add a bookie registration based grpc name 
resolver
02f706f is described below

commit 02f706f18c6a76083bac4e0002f428d27bb8cbdb
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Nov 30 16:29:01 2018 -0800

    [TABLE SERVICE] add a bookie registration based grpc name resolver
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    Table service is using grpc name resolver to figure out the alive servers 
serving table service.
    Currently it is using default dns name resolver. In the context of Pulsar, 
since brokers talk
    to bookies via zookeeper service discovery, it is making sense for brokers 
talk to table service
    via zookeeper service discovery. So this PR is to add a bookie registration 
based grpc name resolver.
    
    *Changes*
    
    Implement a bookie-registration library based grpc name resolver.
    
    
    
    
    Reviewers: Jia Zhai <None>
    
    This closes #1842 from sijie/add_zookeeper_resolver
---
 stream/bk-grpc-name-resolver/pom.xml               |  70 ++++++++++
 .../grpc/resolver/BKRegistrationNameResolver.java  | 133 ++++++++++++++++++
 .../BKRegistrationNameResolverProvider.java        |  84 +++++++++++
 .../bookkeeper/grpc/resolver/package-info.java     |  23 +++
 .../resolver/BKRegistrationNameResolverTest.java   | 155 +++++++++++++++++++++
 .../bookkeeper/grpc/resolver/GrpcChannelsTest.java |  44 ++++++
 .../bookkeeper/clients/utils/GrpcChannels.java     |  19 ++-
 .../bookkeeper/clients/utils/GrpcChannelsTest.java |  70 ++++++++++
 .../resolver/NameResolverFactoryProvider.java      |  37 +++++
 .../resolver/NameResolverProviderFactory.java      |  64 +++++++++
 .../resolver/ServiceNameResolverProvider.java      |  37 +----
 stream/pom.xml                                     |   1 +
 12 files changed, 702 insertions(+), 35 deletions(-)

diff --git a/stream/bk-grpc-name-resolver/pom.xml 
b/stream/bk-grpc-name-resolver/pom.xml
new file mode 100644
index 0000000..312adfb
--- /dev/null
+++ b/stream/bk-grpc-name-resolver/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>stream-storage-parent</artifactId>
+    <groupId>org.apache.bookkeeper</groupId>
+    <version>4.9.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <groupId>org.apache.bookkeeper</groupId>
+  <artifactId>bk-grpc-name-resolver</artifactId>
+  <name>Apache BookKeeper :: Stream Storage :: Common :: BK Grpc Name 
Resolver</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git 
a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java
 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java
new file mode 100644
index 0000000..cc25978
--- /dev/null
+++ 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolver.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+
+/**
+ * A {@link NameResolver} implementation based on bookkeeper {@link 
org.apache.bookkeeper.discover.RegistrationClient}.
+ */
+class BKRegistrationNameResolver extends NameResolver {
+
+    private final MetadataClientDriver clientDriver;
+    private final URI serviceURI;
+    private final ScheduledExecutorService executor;
+
+    private Listener listener;
+    private boolean shutdown;
+    private boolean resolving;
+
+    BKRegistrationNameResolver(MetadataClientDriver clientDriver,
+                               URI serviceURI) {
+        this.clientDriver = clientDriver;
+        this.serviceURI = serviceURI;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat("registration-name-resolver").build());
+    }
+
+    @Override
+    public String getServiceAuthority() {
+        return serviceURI.getAuthority();
+    }
+
+    @Override
+    public synchronized void start(Listener listener) {
+        checkState(null == this.listener, "Resolver already started");
+        this.listener = Objects.requireNonNull(listener, "Listener is null");
+
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(serviceURI.toString());
+
+        try {
+            clientDriver.initialize(conf, executor, NullStatsLogger.INSTANCE, 
Optional.empty());
+        } catch (MetadataException e) {
+            throw new RuntimeException("Failed to initialize registration 
client driver at " + serviceURI, e);
+        }
+
+        resolve();
+    }
+
+    private synchronized void resolve() {
+        if (resolving || shutdown) {
+            return;
+        }
+        resolving = true;
+        this.clientDriver.getRegistrationClient().watchWritableBookies(bookies 
-> {
+            Listener savedListener;
+            synchronized (this) {
+                savedListener = listener;
+            }
+            savedListener.onAddresses(
+                hostsToEquivalentAddressGroups(bookies.getValue()),
+                Attributes.EMPTY
+            );
+        }).whenComplete((ignored, cause) -> {
+            try {
+                if (null != cause) {
+                    resolve();
+                }
+            } finally {
+                synchronized (this) {
+                    resolving = false;
+                }
+            }
+        });
+    }
+
+    private static List<EquivalentAddressGroup> 
hostsToEquivalentAddressGroups(Set<BookieSocketAddress> bookies) {
+        return bookies.stream()
+            .map(addr -> new EquivalentAddressGroup(
+                Collections.singletonList(addr.getSocketAddress()),
+                Attributes.EMPTY
+            ))
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public void shutdown() {
+        synchronized (this) {
+            if (shutdown) {
+                return;
+            }
+            shutdown = true;
+        }
+        executor.shutdown();
+        clientDriver.close();
+    }
+}
diff --git 
a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java
 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java
new file mode 100644
index 0000000..1c61de7
--- /dev/null
+++ 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import com.google.common.collect.Lists;
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import java.net.URI;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.NameResolverFactoryProvider;
+import org.apache.bookkeeper.common.resolver.NameResolverProviderFactory;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+
+/**
+ * An implementation of {@link NameResolverProvider} that provides {@link 
io.grpc.NameResolver}s
+ * to resolve servers registered using bookkeeper registration library.
+ */
+@Slf4j
+public class BKRegistrationNameResolverProvider extends 
NameResolverFactoryProvider {
+
+    @Override
+    protected boolean isAvailable() {
+        return true;
+    }
+
+    @Override
+    protected int priority() {
+        return 100;
+    }
+
+    @Nullable
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        ServiceURI serviceURI;
+        try {
+            serviceURI = ServiceURI.create(targetUri);
+        } catch (NullPointerException | IllegalArgumentException e) {
+            // invalid uri here, so return null to allow grpc to use other 
name resolvers
+            log.info("BKRegistrationNameResolverProvider doesn't know how to 
resolve {} : cause {}",
+                targetUri, e.getMessage());
+            return null;
+        }
+
+        MetadataClientDriver clientDriver;
+        try {
+            clientDriver = 
MetadataDrivers.getClientDriver(serviceURI.getUri());
+            return new BKRegistrationNameResolver(clientDriver, 
serviceURI.getUri());
+        } catch (IllegalArgumentException iae) {
+            log.error("Unknown service uri : {}", serviceURI, iae);
+            return null;
+        }
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        return ServiceURI.SERVICE_ZK;
+    }
+
+    @Override
+    public NameResolver.Factory toFactory() {
+        return new NameResolverProviderFactory(Lists.newArrayList(this));
+    }
+}
diff --git 
a/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java
 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java
new file mode 100644
index 0000000..5e82b08
--- /dev/null
+++ 
b/stream/bk-grpc-name-resolver/src/main/java/org/apache/bookkeeper/grpc/resolver/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A collection of grpc resolovers.
+ */
+package org.apache.bookkeeper.grpc.resolver;
\ No newline at end of file
diff --git 
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
new file mode 100644
index 0000000..726c8b0
--- /dev/null
+++ 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/BKRegistrationNameResolverTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.NameResolver;
+import io.grpc.NameResolver.Listener;
+import io.grpc.Status;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link BKRegistrationNameResolver}.
+ */
+public class BKRegistrationNameResolverTest extends BookKeeperClusterTestCase {
+
+    private static final String ROOT_PATH = "/resolver-test";
+    private static final String SERVERS_PATH = ROOT_PATH + "/servers";
+
+    private final BKRegistrationNameResolverProvider resolverProvider;
+
+    private MetadataBookieDriver bookieDriver;
+    private URI serviceUri;
+
+    public BKRegistrationNameResolverTest() {
+        super(0);
+        this.resolverProvider = new BKRegistrationNameResolverProvider();
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+
+        zkc.transaction()
+            .create(ROOT_PATH, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT)
+            .create(SERVERS_PATH, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT)
+            .create(SERVERS_PATH + "/" + AVAILABLE_NODE, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+            .commit();
+
+        serviceUri = URI.create("zk://" + zkUtil.getZooKeeperConnectString() + 
SERVERS_PATH);
+
+
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.setMetadataServiceUri(serviceUri.toString());
+        bookieDriver = MetadataDrivers.getBookieDriver(serviceUri);
+        bookieDriver.initialize(serverConf, () -> {}, 
NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        bookieDriver.close();
+
+        super.tearDown();
+    }
+
+    @Test
+    public void testNameResolver() throws Exception {
+        int numServers = 3;
+
+        Set<SocketAddress> addressSet = new HashSet<>();
+        for (int i = 0; i < numServers; i++) {
+            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 
3181 + i);
+            addressSet.add(address);
+            bookieDriver.getRegistrationManager().registerBookie(
+                "127.0.0.1:" + (3181 + i), false
+            );
+        }
+
+        LinkedBlockingQueue<List<EquivalentAddressGroup>> notifications = new 
LinkedBlockingQueue<>();
+
+
+        @Cleanup("shutdown")
+        NameResolver resolver = resolverProvider.newNameResolver(serviceUri, 
Attributes.EMPTY);
+        resolver.start(new Listener() {
+            @Override
+            public void onAddresses(List<EquivalentAddressGroup> servers, 
Attributes attributes) {
+                notifications.add(servers);
+            }
+
+            @Override
+            public void onError(Status error) {
+
+            }
+        });
+
+        List<EquivalentAddressGroup> groups = notifications.take();
+        assertEquals(numServers, groups.size());
+
+        Set<SocketAddress> receivedSet = groups.stream()
+            .map(group -> group.getAddresses().get(0))
+            .collect(Collectors.toSet());
+        assertEquals(addressSet, receivedSet);
+
+        // add 3 more servers
+
+        for (int i = numServers; i < 2 * numServers; i++) {
+            InetSocketAddress address = new InetSocketAddress("127.0.0.1", 
3181 + i);
+            addressSet.add(address);
+            bookieDriver.getRegistrationManager().registerBookie(
+                "127.0.0.1:" + (3181 + i), false
+            );
+        }
+
+        List<EquivalentAddressGroup> notification = notifications.take();
+        while (notification.size() < 2 * numServers) {
+            notification = notifications.take();
+        }
+        assertEquals(2 * numServers, notification.size());
+        receivedSet = notification.stream()
+            .map(group -> group.getAddresses().get(0))
+            .collect(Collectors.toSet());
+        assertEquals(addressSet, receivedSet);
+    }
+
+}
diff --git 
a/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java
 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java
new file mode 100644
index 0000000..9df1bf1
--- /dev/null
+++ 
b/stream/bk-grpc-name-resolver/src/test/java/org/apache/bookkeeper/grpc/resolver/GrpcChannelsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.grpc.resolver;
+
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.utils.GrpcChannels;
+import org.junit.Test;
+
+/**
+ * Unit test {@link org.apache.bookkeeper.clients.utils.GrpcChannels} with 
registration based name resolver.
+ */
+public class GrpcChannelsTest {
+
+    @Test
+    public void testZKServiceUri() {
+        String serviceUri = "zk://127.0.0.1/stream/servers";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build());
+        assertTrue(builder instanceof NettyChannelBuilder);
+    }
+
+}
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
index 8588359..f05caec 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/GrpcChannels.java
@@ -23,7 +23,9 @@ import io.grpc.inprocess.InProcessChannelBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.common.resolver.NameResolverFactoryProvider;
 import org.apache.bookkeeper.common.resolver.ServiceNameResolverProvider;
+import org.apache.bookkeeper.common.util.ReflectionUtils;
 
 /**
  * Utils to create grpc channels.
@@ -32,6 +34,8 @@ import 
org.apache.bookkeeper.common.resolver.ServiceNameResolverProvider;
 public final class GrpcChannels {
 
     private static final String BACKEND_INPROCESS = "inprocess";
+    private static final String BK_REG_NAME_RESOLVER_PROVIDER =
+        
"org.apache.bookkeeper.grpc.resolver.BKRegistrationNameResolverProvider";
 
     private GrpcChannels() {}
 
@@ -51,9 +55,22 @@ public final class GrpcChannels {
             // this is an inprocess service, so build an inprocess channel.
             String serviceName = uri.getServiceHosts()[0];
             builder = 
InProcessChannelBuilder.forName(serviceName).directExecutor();
-        } else {
+        } else if (null == uri.getServiceName() || 
ServiceURI.SERVICE_BK.equals(uri.getServiceName())) {
             builder = ManagedChannelBuilder.forTarget(serviceUri)
                 .nameResolverFactory(new 
ServiceNameResolverProvider().toFactory());
+        } else {
+            NameResolverFactoryProvider provider;
+            try {
+                provider = ReflectionUtils.newInstance(
+                    BK_REG_NAME_RESOLVER_PROVIDER,
+                    NameResolverFactoryProvider.class);
+            } catch (RuntimeException re) {
+                log.error("It seems that you don't have 
`bk-grpc-name-resolver` in your class path."
+                    + " Please make sure you include it as your application's 
dependency.");
+                throw re;
+            }
+            builder = ManagedChannelBuilder.forTarget(serviceUri)
+                .nameResolverFactory(provider.toFactory());
         }
         if (settings.usePlaintext()) {
             builder = builder.usePlaintext();
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java
new file mode 100644
index 0000000..3e3e81e
--- /dev/null
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/GrpcChannelsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.junit.Test;
+
+/**
+ * Unit test {@link GrpcChannels}.
+ */
+public class GrpcChannelsTest {
+
+    @Test
+    public void testInprocessServiceUri() {
+        String serviceUri = "bk+inprocess://service";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+        );
+        assertTrue(builder instanceof InProcessChannelBuilder);
+    }
+
+    @Test
+    public void testBKServiceUri() {
+        String serviceUri = "bk://127.0.0.1";
+        ManagedChannelBuilder builder = GrpcChannels.createChannelBuilder(
+            serviceUri,
+            StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+        );
+        assertTrue(builder instanceof NettyChannelBuilder);
+    }
+
+    @Test
+    public void testZKServiceUri() {
+        String serviceUri = "zk://127.0.0.1/stream/servers";
+        try {
+            GrpcChannels.createChannelBuilder(
+                serviceUri,
+                
StorageClientSettings.newBuilder().serviceUri(serviceUri).build()
+            );
+            fail("Should fail to create grpc channel because 
`bk-grpc-name-resolver` is not in the classpath");
+        } catch (RuntimeException re) {
+            assertTrue(re.getCause() instanceof ClassNotFoundException);
+        }
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java
new file mode 100644
index 0000000..c8edeae
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverFactoryProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+
+/**
+ * A {@link NameResolverProvider} that provides method to convert back to 
{@link NameResolver.Factory}.
+ */
+public abstract class NameResolverFactoryProvider extends NameResolverProvider 
{
+
+    /**
+     * Convert the provider to a {@link NameResolver.Factory}.
+     *
+     * @return the name resolver factory.
+     */
+    public abstract NameResolver.Factory toFactory();
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java
new file mode 100644
index 0000000..571ef6f
--- /dev/null
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/NameResolverProviderFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.resolver;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.grpc.Attributes;
+import io.grpc.NameResolver;
+import io.grpc.NameResolverProvider;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * A {@link io.grpc.NameResolverProvider} based {@link NameResolver.Factory}.
+ */
+public class NameResolverProviderFactory extends NameResolver.Factory {
+
+    private final List<NameResolverProvider> providers;
+
+    public NameResolverProviderFactory(List<NameResolverProvider> providers) {
+        this.providers = providers;
+    }
+
+    @Override
+    public NameResolver newNameResolver(URI targetUri, Attributes params) {
+        checkForProviders();
+        for (NameResolverProvider provider : providers) {
+            NameResolver resolver = provider.newNameResolver(targetUri, 
params);
+            if (resolver != null) {
+                return resolver;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String getDefaultScheme() {
+        checkForProviders();
+        return providers.get(0).getDefaultScheme();
+    }
+
+    private void checkForProviders() {
+        checkState(!providers.isEmpty(),
+            "No NameResolverProviders found. Please check your configuration");
+    }
+
+}
diff --git 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
index 9e11adf..05bf7b5 100644
--- 
a/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
+++ 
b/stream/common/src/main/java/org/apache/bookkeeper/common/resolver/ServiceNameResolverProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.bookkeeper.common.resolver;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import io.grpc.Attributes;
@@ -41,7 +39,7 @@ import 
org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
  * to resolve {@link org.apache.bookkeeper.common.net.ServiceURI}.
  */
 @Slf4j
-public final class ServiceNameResolverProvider extends NameResolverProvider {
+public final class ServiceNameResolverProvider extends 
NameResolverFactoryProvider {
 
     private final DnsNameResolverProvider dnsProvider;
     private final Resource<ExecutorService> executorResource;
@@ -127,38 +125,9 @@ public final class ServiceNameResolverProvider extends 
NameResolverProvider {
         return ServiceURI.SERVICE_BK;
     }
 
+    @Override
     public NameResolver.Factory toFactory() {
-        return new NameResolverFactory(Lists.newArrayList(this));
+        return new NameResolverProviderFactory(Lists.newArrayList(this));
     }
 
-    private static class NameResolverFactory extends NameResolver.Factory {
-        private final List<NameResolverProvider> providers;
-
-        public NameResolverFactory(List<NameResolverProvider> providers) {
-            this.providers = providers;
-        }
-
-        @Override
-        public NameResolver newNameResolver(URI targetUri, Attributes params) {
-            checkForProviders();
-            for (NameResolverProvider provider : providers) {
-                NameResolver resolver = provider.newNameResolver(targetUri, 
params);
-                if (resolver != null) {
-                    return resolver;
-                }
-            }
-            return null;
-        }
-
-        @Override
-        public String getDefaultScheme() {
-            checkForProviders();
-            return providers.get(0).getDefaultScheme();
-        }
-
-        private void checkForProviders() {
-            checkState(!providers.isEmpty(),
-                "No NameResolverProviders found. Please check your 
configuration");
-        }
-    }
 }
diff --git a/stream/pom.xml b/stream/pom.xml
index f486681..e8a0491 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,6 +38,7 @@
     <module>clients</module>
     <module>storage</module>
     <module>server</module>
+    <module>bk-grpc-name-resolver</module>
   </modules>
 
   <build>

Reply via email to