sijie closed pull request #1721: [table service][storage] add routing table for 
proxying table service requests
URL: https://github.com/apache/bookkeeper/pull/1721
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 76c763138d..e5206de6a3 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -58,6 +58,7 @@
 
     private final Optional<String> token;
     private final Channel channel;
+    private final StorageServerChannel interceptedServerChannel;
 
     @GuardedBy("this")
     private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +88,11 @@ public StorageServerChannel(Endpoint endpoint,
             resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
+        this.interceptedServerChannel = null;
+    }
+
+    public Channel getGrpcChannel() {
+        return channel;
     }
 
     @VisibleForTesting
@@ -97,8 +103,15 @@ public StorageServerChannel(ManagedChannel channel,
 
     protected StorageServerChannel(Channel channel,
                                    Optional<String> token) {
+        this(channel, token, null);
+    }
+
+    private StorageServerChannel(Channel channel,
+                                 Optional<String> token,
+                                 StorageServerChannel 
interceptedServerChannel) {
         this.token = token;
         this.channel = channel;
+        this.interceptedServerChannel = interceptedServerChannel;
     }
 
     public synchronized RootRangeServiceFutureStub getRootRangeService() {
@@ -153,13 +166,18 @@ public StorageServerChannel 
intercept(ClientInterceptor... interceptors) {
             interceptors);
         return new StorageServerChannel(
             interceptedChannel,
-            this.token);
+            this.token,
+            this);
     }
 
     @Override
     public void close() {
-        if (channel instanceof ManagedChannel) {
-            ((ManagedChannel) channel).shutdown();
+        if (interceptedServerChannel != null) {
+            interceptedServerChannel.close();
+        } else {
+            if (channel instanceof ManagedChannel) {
+                ((ManagedChannel) channel).shutdown();
+            }
         }
     }
 }
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
index a89aaf7d23..abc9c6d23a 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
@@ -49,7 +49,7 @@
     private ProtocolInternalUtils() {
     }
 
-    static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
+    public static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
         TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap();
         long lastEndKey = Long.MIN_VALUE;
         for (RelatedRanges rr : response.getRangesList()) {
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
index 1dd5f864aa..e1260665ad 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
@@ -19,6 +19,9 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import io.grpc.ManagedChannel;
 import io.grpc.Server;
@@ -68,4 +71,13 @@ public void testBasic() {
         channel.close();
     }
 
+    @Test
+    public void testIntercept() {
+        ManagedChannel channel = mock(ManagedChannel.class);
+        StorageServerChannel ssChannel = new StorageServerChannel(channel, 
Optional.empty());
+        StorageServerChannel interceptedChannel = ssChannel.intercept(1L);
+        interceptedChannel.close();
+        verify(channel, times(1)).shutdown();
+    }
+
 }
diff --git 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0088c4b308..b888b45353 100644
--- 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.stream.protocol;
 
 import io.grpc.Metadata;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
 import org.apache.bookkeeper.stream.proto.RangeKeyType;
 import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -115,4 +117,24 @@ private ProtocolConstants() {
     public static final String ROUTING_KEY = "rk" + 
Metadata.BINARY_HEADER_SUFFIX;
     public static final String STREAM_ID_KEY = "sid-" + 
Metadata.BINARY_HEADER_SUFFIX;
     public static final String RANGE_ID_KEY = "rid-" + 
Metadata.BINARY_HEADER_SUFFIX;
+
+    // the metadata keys in grpc call metadata
+    public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of(
+        SC_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+        RANGE_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+        STREAM_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+        ROUTING_KEY,
+        IdentityBinaryMarshaller.of()
+    );
+
+
 }
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 97ff66ad2b..4385d56e1f 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -25,6 +25,9 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import 
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
@@ -54,6 +57,7 @@
 import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
@@ -249,6 +253,10 @@ public static LifecycleComponent 
buildStorageServer(CompositeConfiguration conf,
             dlConf,
             rootStatsLogger.scope("dlog"));
 
+        // client settings for the proxy channels
+        StorageClientSettings proxyClientSettings = 
StorageClientSettings.newBuilder()
+            .serviceUri("bk://localhost:" + grpcPort)
+            .build();
         // Create range (stream) store
         StorageContainerStoreBuilder storageContainerStoreBuilder = 
StorageContainerStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
@@ -286,7 +294,15 @@ public static LifecycleComponent 
buildStorageServer(CompositeConfiguration conf,
                     () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                     storageConf.getRangeStoreDirs(),
                     storageResources,
-                    storageConf.getServeReadOnlyTables()));
+                    storageConf.getServeReadOnlyTables()))
+            // with client manager for proxying grpc requests
+            .withStorageServerClientManager(() -> new 
StorageServerClientManagerImpl(
+                proxyClientSettings,
+                storageResources.scheduler(),
+                StorageServerChannel.factory(proxyClientSettings)
+                    // intercept the channel to attach routing header
+                    .andThen(channel -> channel.intercept(new 
RoutingHeaderProxyInterceptor()))
+            ));
         StorageService storageService = new StorageService(
             storageConf, storageContainerStoreBuilder, 
rootStatsLogger.scope("storage"));
 
diff --git 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index 57634d32d1..a05106058e 100644
--- 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++ 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -35,6 +35,15 @@
      */
     StorageContainer getStorageContainer(long storageContainerId);
 
+    /**
+     * Get the instance of storage container {@code storageContainerId}.
+     *
+     * @param storageContainerId storage container id
+     * @param defaultContainer the default container to return if the 
container doesn't exist in the registry
+     * @return the instance of the storage container.
+     */
+    StorageContainer getStorageContainer(long storageContainerId, 
StorageContainer defaultContainer);
+
     /**
      * Start the storage container in this registry.
      *
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index 52cef7fb33..a2ce3e0de9 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -66,6 +66,13 @@
       <artifactId>stream-storage-tests-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index 9c96139dfa..cac363f41e 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -17,6 +17,8 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.net.URI;
+import java.util.function.Supplier;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import 
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -46,6 +48,7 @@ public static StorageContainerStoreBuilder newBuilder() {
         StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
     private URI defaultBackendUri = null;
+    private Supplier<StorageServerClientManager> clientManagerSupplier;
 
     private StorageContainerStoreBuilder() {
     }
@@ -131,12 +134,25 @@ public StorageContainerStoreBuilder 
withDefaultBackendUri(URI uri) {
         return this;
     }
 
+    /**
+     * Supplier to provide client manager for proxying requests.
+     *
+     * @param clientManagerSupplier client manager supplier
+     * @return storage container store builder
+     */
+    public StorageContainerStoreBuilder withStorageServerClientManager(
+        Supplier<StorageServerClientManager> clientManagerSupplier) {
+        this.clientManagerSupplier = clientManagerSupplier;
+        return this;
+    }
+
     public StorageContainerStore build() {
         checkNotNull(scmFactory, "StorageContainerManagerFactory is not 
provided");
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
         checkNotNull(placementPolicyFactory, "Storage Container Placement 
Policy Factory is not provided");
+        checkNotNull(clientManagerSupplier, "Storage server client manager is 
not provided");
 
         RangeStoreServiceFactoryImpl serviceFactory = new 
RangeStoreServiceFactoryImpl(
             storeConf,
@@ -152,6 +168,7 @@ public StorageContainerStore build() {
             storeConf,
             scmFactory,
             containerServiceFactory,
+            clientManagerSupplier.get(),
             statsLogger);
     }
 
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
index 56fcba2a8d..1271e85e3c 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -15,15 +15,19 @@
 package org.apache.bookkeeper.stream.storage.impl;
 
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 
 import io.grpc.Channel;
 import io.grpc.Metadata;
 import io.grpc.ServerCall;
 import java.io.IOException;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
@@ -31,11 +35,15 @@
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 
 /**
- * KeyRange Service.
+ * Storage Container Store manages the containers and routes the requests 
accordingly.
  */
 public class StorageContainerStoreImpl
     extends AbstractLifecycleComponent<StorageConfiguration>
@@ -45,11 +53,14 @@
     private final StorageContainerRegistryImpl scRegistry;
     private final StorageContainerManager scManager;
     private final StorageContainerServiceFactory serviceFactory;
-    private final Metadata.Key<Long> scIdKey;
+    private final StorageServerClientManager ssClientManager;
+    private final RangeRoutingTable routingTable;
+    private final StorageContainerProxyChannelManager proxyChannelManager;
 
     public StorageContainerStoreImpl(StorageConfiguration conf,
                                      StorageContainerManagerFactory 
managerFactory,
                                      StorageContainerServiceFactory 
serviceFactory,
+                                     StorageServerClientManager 
ssClientManager,
                                      StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.scmFactory = managerFactory;
@@ -57,9 +68,14 @@ public StorageContainerStoreImpl(StorageConfiguration conf,
             new DefaultStorageContainerFactory(serviceFactory));
         this.scManager = scmFactory.create(conf, scRegistry);
         this.serviceFactory = serviceFactory;
-        this.scIdKey = Metadata.Key.of(
-            SC_ID_KEY,
-            LongBinaryMarshaller.of());
+        this.ssClientManager = ssClientManager;
+        if (ssClientManager == null) {
+            this.proxyChannelManager = null;
+            this.routingTable = null;
+        } else {
+            this.proxyChannelManager = new 
StorageContainerProxyChannelManagerImpl(ssClientManager);
+            this.routingTable = new RangeRoutingTableImpl(ssClientManager);
+        }
     }
 
     @Override
@@ -83,6 +99,10 @@ protected void doStart() {
 
     @Override
     protected void doStop() {
+        // it doesn't have to be blocked at waiting closing proxy channels to 
be completed.
+        if (null != ssClientManager) {
+            this.ssClientManager.closeAsync();
+        }
         this.scManager.stop();
         this.scRegistry.close();
     }
@@ -97,17 +117,69 @@ StorageContainer getStorageContainer(long scId) {
         return scRegistry.getStorageContainer(scId);
     }
 
+    StorageContainer getStorageContainer(long scId, StorageContainer 
defaultContainer) {
+        return scRegistry.getStorageContainer(scId, defaultContainer);
+    }
+
     //
     // Utils for proxies
     //
 
     @Override
     public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
-        Long scId = headers.get(scIdKey);
-        if (null == scId) {
-            // use the invalid storage container id, so it will fail the 
request.
-            scId = INVALID_STORAGE_CONTAINER_ID;
+        Long scId = headers.get(SCID_METADATA_KEY);
+        if (null != scId) {
+            // if a request is sent directly to a container, then find the 
container
+            StorageContainer container = getStorageContainer(scId, null);
+            if (null != container) {
+                return container.getChannel();
+            } else {
+                if (scId == 0L && null != proxyChannelManager) {
+                    // root container, we attempt to always proxy the requests 
for root container
+                    Channel channel = 
proxyChannelManager.getStorageContainerChannel(0L);
+                    if (null != channel) {
+                        return channel;
+                    } else {
+                        // no channel found to proxy the request, fail the 
request with 404 container
+                        return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+                    }
+                } else {
+                    // no container is found and the scId is not the root 
container
+                    // then fail the request with 404 container
+                    return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+                }
+            }
+        } else {
+            // if a request is not sent directly to a container, then check if
+            // streamId + routingKey is attached in the header. if so, figure 
out
+            // which the range id and storage container id to route the 
request.
+            byte[] routingKey = headers.get(RK_METADATA_KEY);
+            Long streamId = headers.get(SID_METADATA_KEY);
+            if (null != routingKey && null != streamId && null != routingTable 
&& null != proxyChannelManager) {
+                RangeProperties rangeProps = routingTable.getRange(streamId, 
routingKey);
+                if (null != rangeProps) {
+                    long containerId = rangeProps.getStorageContainerId();
+                    long rangeId = rangeProps.getRangeId();
+                    // if we find the routing information, we can update the 
headers
+                    headers.put(SCID_METADATA_KEY, containerId);
+                    headers.put(RID_METADATA_KEY, rangeId);
+                    // if we find the container id, we can check whether the 
container is owned locally.
+                    // if so, forward the request to the container.
+                    StorageContainer container = 
getStorageContainer(containerId, null);
+                    if (null == container) {
+                        // the container doesn't belong to here, then find the 
channel to forward the request
+                        Channel channel = 
proxyChannelManager.getStorageContainerChannel(containerId);
+                        if (null != channel) {
+                            return channel;
+                        }
+                    } else {
+                        // we found the container exists in the registry to 
serve the request
+                        return container.getChannel();
+                    }
+                }
+            }
+            // there is no storage container id or routing key, then fail the 
request and ask client to retry.
+            return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
         }
-        return getStorageContainer(scId).getChannel();
     }
 }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
new file mode 100644
index 0000000000..bb80142149
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+
+/**
+ * A routing table for figuring the ranges to forward the requests.
+ */
+public interface RangeRoutingTable {
+
+    /**
+     * Get the range metadata for <tt>streamId</tt> and the routing key 
<tt>routingKey</tt>.
+     *
+     * @param streamId stream id
+     * @param routingKey routing key
+     * @return the range that serves this routing key
+     */
+    RangeProperties getRange(long streamId, byte[] routingKey);
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
new file mode 100644
index 0000000000..b9727c9037
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A default implementation of {@link RangeRoutingTable}.
+ */
+@Slf4j
+public class RangeRoutingTableImpl implements RangeRoutingTable {
+
+    private final StorageServerClientManager manager;
+    private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges;
+
+    // outstanding requests
+    private final 
ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> 
outstandingUpdates;
+
+    public RangeRoutingTableImpl(StorageServerClientManager manager) {
+        this.manager = manager;
+        this.ranges = new ConcurrentLongHashMap<>();
+        this.outstandingUpdates = new ConcurrentLongHashMap<>();
+    }
+
+    @VisibleForTesting
+    RangeRouter<byte[]> getRangeRouter(long streamId) {
+        return ranges.get(streamId);
+    }
+
+    @Override
+    public RangeProperties getRange(long streamId, byte[] routingKey) {
+        RangeRouter<byte[]> router = ranges.get(streamId);
+        if (null == router) {
+            // trigger to fetch stream metadata, but return `null` since
+            // the range router is not ready, let the client backoff and retry.
+            fetchStreamRanges(streamId);
+            return null;
+        } else {
+            return router.getRangeProperties(routingKey);
+        }
+    }
+
+    @VisibleForTesting
+    CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long 
streamId) {
+        return outstandingUpdates.get(streamId);
+    }
+
+    private void fetchStreamRanges(long streamId) {
+        if (null != outstandingUpdates.get(streamId)) {
+            // there is already an outstanding fetch request, do nothing
+            return;
+        }
+        final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new 
CompletableFuture<>();
+        if (null != outstandingUpdates.put(streamId, newFetchFuture)) {
+            // some one already triggers the fetch
+            return;
+        }
+        FutureUtils.proxyTo(
+            manager.openMetaRangeClient(streamId)
+                .thenCompose(metaRangeClient -> 
metaRangeClient.getActiveDataRanges())
+                .thenApply(hashStreamRanges -> {
+                    RangeRouter<byte[]> router = new 
RangeRouter<>(BytesHashRouter.of());
+                    router.setRanges(hashStreamRanges);
+                    return router;
+                })
+                .whenComplete((router, cause) -> {
+                    if (null == cause) {
+                        ranges.put(streamId, router);
+                    }
+                    outstandingUpdates.remove(streamId, newFetchFuture);
+                }),
+            newFetchFuture
+        );
+    }
+}
diff --git 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
similarity index 70%
rename from 
stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
rename to 
stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
index 2ef970f92f..377f6df2c6 100644
--- 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
 
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.MessageLite;
@@ -34,6 +34,9 @@
 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,8 +44,6 @@
 import java.util.Map;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
@@ -50,25 +51,13 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.commons.codec.binary.Hex;
 
 /**
  * A client interceptor that intercepting kv rpcs to attach routing 
information.
  */
 @Slf4j
-public class RoutingHeaderClientInterceptor implements ClientInterceptor {
-
-    static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
-        RANGE_ID_KEY,
-        LongBinaryMarshaller.of()
-    );
-    static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
-        STREAM_ID_KEY,
-        LongBinaryMarshaller.of()
-    );
-    static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
-        ROUTING_KEY,
-        IdentityBinaryMarshaller.of()
-    );
+public class RoutingHeaderProxyInterceptor implements ClientInterceptor {
 
     /**
      * Table request mutator that mutates a table service rpc request to attach
@@ -175,53 +164,75 @@ ReqT intercept(ReqT request,
                                                                CallOptions 
callOptions,
                                                                Channel next) {
         if (log.isTraceEnabled()) {
-            log.trace("Intercepting method {}", method.getFullMethodName());
+            log.trace("Intercepting method {} : req marshaller = {}, resp 
marshaller = {}",
+                method.getFullMethodName(),
+                method.getRequestMarshaller(),
+                method.getResponseMarshaller());
         }
         InterceptorDescriptor<?> descriptor = 
kvRpcMethods.get(method.getFullMethodName());
-        if (null != descriptor) {
-            return new SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
-
-                private Long rid = null;
-                private Long sid = null;
-                private byte[] rk = null;
+        return new SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
 
-                @Override
-                public void start(Listener<RespT> responseListener, Metadata 
headers) {
-                    // capture routing information from headers
-                    sid = headers.get(SID_METADATA_KEY);
-                    rid = headers.get(RID_METADATA_KEY);
-                    rk  = headers.get(RK_METADATA_KEY);
-                    if (log.isTraceEnabled()) {
-                        log.trace("Intercepting request with header : sid = 
{}, rid = {}, rk = {}",
-                            sid, rid, rk);
-                    }
+            private Long rid = null;
+            private Long sid = null;
+            private byte[] rk = null;
 
-                    delegate().start(responseListener, headers);
+            @Override
+            public void start(Listener<RespT> responseListener, Metadata 
headers) {
+                // capture routing information from headers
+                sid = headers.get(SID_METADATA_KEY);
+                rid = headers.get(RID_METADATA_KEY);
+                rk  = headers.get(RK_METADATA_KEY);
+                if (log.isTraceEnabled()) {
+                    log.trace("Intercepting request with header : sid = {}, 
rid = {}, rk = {}",
+                        sid, rid, rk);
                 }
 
-                @Override
-                public void sendMessage(ReqT message) {
-                    ReqT interceptedMessage;
-                    if (null == rid || null == sid || null == rk) {
-                        // we don't have enough information to form the new 
routing header
-                        // so do nothing
-                        interceptedMessage = message;
-                    } else {
-                        interceptedMessage = interceptMessage(
-                            method,
-                            descriptor,
-                            message,
-                            sid,
-                            rid,
-                            rk
-                        );
-                    }
-                    delegate().sendMessage(interceptedMessage);
+                delegate().start(responseListener, headers);
+            }
+
+            @Override
+            public void sendMessage(ReqT message) {
+                ReqT interceptedMessage;
+                if (null == rid || null == sid || null == rk || null == 
descriptor) {
+                    // we don't have enough information to form the new 
routing header
+                    // we simply copy the bytes and generate a new message to 
forward
+                    // the request payload
+                    interceptedMessage = interceptMessage(method, message);
+                } else {
+                    interceptedMessage = interceptMessage(
+                        method,
+                        descriptor,
+                        message,
+                        sid,
+                        rid,
+                        rk
+                    );
                 }
-            };
-        } else {
-            return next.newCall(method, callOptions);
+                delegate().sendMessage(interceptedMessage);
+            }
+        };
+    }
+
+    private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT> 
method, ReqT message) {
+        InputStream is = method.getRequestMarshaller().stream(message);
+        int bytes;
+        try {
+            bytes = is.available();
+        } catch (IOException e) {
+            log.warn("Encountered exceptions in getting available bytes of 
message", e);
+            throw new RuntimeException("Encountered exception in intercepting 
message", e);
+        }
+        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
+        try {
+            buffer.writeBytes(is, bytes);
+        } catch (IOException e) {
+            log.warn("Encountered exceptions in transferring bytes to the 
buffer", e);
+            buffer.release();
+            throw new RuntimeException("Encountered exceptions in transferring 
bytes to the buffer", e);
         }
+        return method
+            .getRequestMarshaller()
+            .parse(new ByteBufInputStream(buffer, true));
     }
 
     private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
@@ -237,7 +248,9 @@ public void sendMessage(ReqT message) {
         } else {
             try {
                 return interceptTableRequest(method, descriptor, message, sid, 
rid, rk);
-            } catch (IOException ioe) {
+            } catch (Throwable t) {
+                log.error("Failed to intercept table request (sid = {}, rid = 
{}, rk = {}) : ",
+                    sid, rid, Hex.encodeHexString(rk), t);
                 return message;
             }
         }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
new file mode 100644
index 0000000000..deb054e150
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+
+/**
+ * A manager that manages all the proxy channels to other storage containers.
+ */
+public interface StorageContainerProxyChannelManager {
+
+    /**
+     * Get the channel to storage container <tt>scId</tt>.
+     *
+     * <p>The method can return `null` if the channel is not ready.
+     *
+     * @param scId storage container id
+     * @return channel to the given storage container, or `null` if it can't 
connect
+     */
+    Channel getStorageContainerChannel(long scId);
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
new file mode 100644
index 0000000000..ed69e0f040
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+
+/**
+ * Default implementation of {@link StorageContainerProxyChannelManager}.
+ */
+public class StorageContainerProxyChannelManagerImpl implements 
StorageContainerProxyChannelManager {
+
+    // we can ideally just talk to the location service directly.
+    // however currently storage container is separated from actual services 
to make a clean interface
+    // so for now proxy channel manager will be acting as a client to talk to 
its local server to
+    // get location related information
+    private final StorageServerClientManager ssClientManager;
+
+    public StorageContainerProxyChannelManagerImpl(StorageServerClientManager 
clientManager) {
+        this.ssClientManager = clientManager;
+    }
+
+    @Override
+    public Channel getStorageContainerChannel(long scId) {
+        StorageContainerChannel channel = 
ssClientManager.getStorageContainerChannel(scId);
+        // this will trigger creating the channel for the first time
+        CompletableFuture<StorageServerChannel> serverChannelFuture = 
channel.getStorageContainerChannelFuture();
+        if (null != serverChannelFuture && serverChannelFuture.isDone()) {
+            StorageServerChannel serverChannel = serverChannelFuture.join();
+            if (serverChannel != null) {
+                return serverChannel.getGrpcChannel();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
new file mode 100644
index 0000000000..5d4437ca36
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes related to grpc request routing.
+ */
+package org.apache.bookkeeper.stream.storage.impl.routing;
\ No newline at end of file
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 48330b7e13..160d9a8647 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -58,7 +58,12 @@ public int getNumStorageContainers() {
 
     @Override
     public StorageContainer getStorageContainer(long storageContainerId) {
-        return containers.getOrDefault(storageContainerId, 
StorageContainer404.of());
+        return getStorageContainer(storageContainerId, 
StorageContainer404.of());
+    }
+
+    @Override
+    public StorageContainer getStorageContainer(long storageContainerId, 
StorageContainer defaultContainer) {
+        return containers.getOrDefault(storageContainerId, defaultContainer);
     }
 
     @Override
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 3d3a0ad1b5..59d6a1306b 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,6 +18,7 @@
 import static org.mockito.Mockito.mock;
 
 import java.net.URI;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -47,6 +48,7 @@ public void testBuildNullConfiguration() {
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(uri)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .build();
     }
 
@@ -57,6 +59,7 @@ public void testBuildNullResources() {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(null)
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -68,6 +71,7 @@ public void testBuildNullRGManagerFactory() {
             .withStorageContainerManagerFactory(null)
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -79,6 +83,7 @@ public void testBuildNullStoreFactory() {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(null)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -90,10 +95,23 @@ public void testBuildNullDefaultBackendUri() {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(null)
             .build();
     }
 
+    @Test(expected = NullPointerException.class)
+    public void testBuildStorageServerClientManager() {
+        StorageContainerStoreBuilder.newBuilder()
+            .withStorageConfiguration(mock(StorageConfiguration.class))
+            
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
+            .withStorageResources(StorageResources.create())
+            .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(null)
+            .withDefaultBackendUri(uri)
+            .build();
+    }
+
     @Test
     public void testBuild() {
         StorageContainerStore storageContainerStore = 
StorageContainerStoreBuilder.newBuilder()
@@ -101,6 +119,7 @@ public void testBuild() {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
         assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index b9d5e69798..6ae3658dad 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -85,6 +85,7 @@
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import 
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import 
org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -133,6 +134,7 @@ private static Endpoint createEndpoint(String hostname,
         NamespaceConfiguration.newBuilder()
             .setDefaultStreamConf(DEFAULT_STREAM_CONF)
             .build();
+    private final StorageResources resources = StorageResources.create();
     private RangeStoreService mockRangeStoreService;
     private StorageContainerStoreImpl rangeStore;
     private Server server;
@@ -230,6 +232,7 @@ public void setUp() throws Exception {
             (storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, 
rgRegistry, 2),
             new 
RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+            null,
             NullStatsLogger.INSTANCE);
 
         rangeStore.start();
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
new file mode 100644
index 0000000000..8201ba714d
--- /dev/null
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import 
org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
+import org.apache.bookkeeper.stream.proto.storage.RelationType;
+import 
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
+import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RangeRoutingTable}.
+ */
+public class RangeRoutingTableImplTest extends GrpcClientTestBase {
+
+    private final long scId = 1234L;
+    private final long streamId = 123456L;
+    private GetActiveRangesResponse getActiveRangesResponse;
+    private CompletableFuture<GetActiveRangesResponse> responseSupplier;
+    private StreamProperties props;
+    private List<RangeProperties> rangeProps;
+    private RangeRoutingTableImpl routingTable;
+    private RangeRouter<byte[]> rangeRouter;
+
+    @Override
+    protected void doSetup() throws Exception {
+        this.props = StreamProperties.newBuilder()
+            .setStorageContainerId(scId)
+            .setStreamId(streamId)
+            .setStreamName("metaclient-stream")
+            .setStreamConf(StreamConfiguration.newBuilder().build())
+            .build();
+        this.rangeProps = ProtoUtils.split(
+            streamId,
+            24,
+            23456L,
+            StorageContainerPlacementPolicyImpl.of(4)
+        );
+        final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder = 
GetActiveRangesResponse.newBuilder();
+        for (RangeProperties range : rangeProps) {
+            RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder()
+                .setProps(range)
+                .setType(RelationType.PARENTS)
+                .addAllRelatedRanges(Collections.emptyList());
+            getActiveRangesResponseBuilder.addRanges(rrBuilder);
+        }
+        this.getActiveRangesResponse = getActiveRangesResponseBuilder
+            .setCode(StatusCode.SUCCESS)
+            .build();
+        RootRangeServiceImplBase rootRangeService = new 
RootRangeServiceImplBase() {
+            @Override
+            public void getStream(GetStreamRequest request,
+                                  StreamObserver<GetStreamResponse> 
responseObserver) {
+                responseObserver.onNext(GetStreamResponse.newBuilder()
+                    .setCode(StatusCode.SUCCESS)
+                    .setStreamProps(props)
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(rootRangeService);
+
+        this.responseSupplier = new CompletableFuture<>();
+        // register a good meta range service
+        MetaRangeServiceImplBase metaRangeService = new 
MetaRangeServiceImplBase() {
+            @Override
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        
StreamObserver<GetActiveRangesResponse> responseObserver) {
+                try {
+                    responseObserver.onNext(responseSupplier.get());
+                    responseObserver.onCompleted();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    responseObserver.onError(e);
+                } catch (ExecutionException e) {
+                    responseObserver.onError(e);
+                }
+            }
+        };
+        serviceRegistry.addService(metaRangeService);
+
+        this.routingTable = new RangeRoutingTableImpl(serverManager);
+        this.rangeRouter = new RangeRouter<>(BytesHashRouter.of());
+        
this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse));
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    @Test
+    public void testGetRange() throws Exception {
+        String key = "foo";
+        byte[] keyBytes = key.getBytes(UTF_8);
+        RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+        // the first get will return null since there is nothing in
+        assertNull(rangeProps);
+        // the fetch request is outstanding
+        CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+            routingTable.getOutstandingFetchRequest(streamId);
+        assertNotNull(outstandingFetchFuture);
+        assertFalse(outstandingFetchFuture.isDone());
+
+        // complete the response supplier, so the fetch request can complete 
to update the cache
+        responseSupplier.complete(getActiveRangesResponse);
+
+        // wait until the stuff is cached.
+        while (null == routingTable.getRangeRouter(streamId)) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        // if the router is created, it should return the cached router
+        rangeProps = routingTable.getRange(streamId, keyBytes);
+        assertNotNull(rangeProps);
+        assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps);
+    }
+
+    @Test
+    public void testGetRangeException() throws Exception {
+        String key = "foo";
+        byte[] keyBytes = key.getBytes(UTF_8);
+        RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+        // the first get will return null since there is nothing in
+        assertNull(rangeProps);
+        // the fetch request is outstanding
+        CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+            routingTable.getOutstandingFetchRequest(streamId);
+        assertNotNull(outstandingFetchFuture);
+        assertFalse(outstandingFetchFuture.isDone());
+
+        // complete the response supplier, so the fetch request can complete 
to update the cache
+        responseSupplier.completeExceptionally(new Exception("fetch failed"));
+
+        // wait until the fetch is done.
+        try {
+            outstandingFetchFuture.get();
+            fail("Fetch request should fail");
+        } catch (Exception e) {
+            // expected
+        }
+
+        // once the fetch is done, nothing should be cached and the 
outstanding fetch request should be removed
+        assertNull(routingTable.getRangeRouter(streamId));
+        assertNull(routingTable.getOutstandingFetchRequest(streamId));
+    }
+
+}
diff --git 
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
similarity index 94%
rename from 
stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
rename to 
stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
index 745118b049..43c060b12c 100644
--- 
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
@@ -17,9 +17,12 @@
  * under the License.
  */
 
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 import static org.junit.Assert.assertEquals;
 
 import com.google.protobuf.ByteString;
@@ -54,10 +57,10 @@
 import org.junit.Test;
 
 /**
- * Unit test {@link RoutingHeaderClientInterceptor}.
+ * Unit test {@link RoutingHeaderProxyInterceptor}.
  */
 @Slf4j
-public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase {
 
     private final long streamId = 1234L;
     private final long rangeId = 2345L;
@@ -136,11 +139,12 @@ public void put(PutRequest request, 
StreamObserver<PutResponse> responseObserver
         };
         serviceRegistry.addService(tableService.bindService());
 
+
         this.channel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty()
         ).intercept(
-            new RoutingHeaderClientInterceptor(),
+            new RoutingHeaderProxyInterceptor(),
             new ClientInterceptor() {
                 @Override
                 public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> method,
@@ -152,15 +156,15 @@ protected void checkedStart(Listener<RespT> 
responseListener, Metadata headers)
                             log.info("Intercept the request with routing 
information : sid = {}, rid = {}, rk = {}",
                                 streamId, rangeId, new String(routingKey, 
UTF_8));
                             headers.put(
-                                
RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+                                RID_METADATA_KEY,
                                 rangeId
                             );
                             headers.put(
-                                
RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+                                SID_METADATA_KEY,
                                 streamId
                             );
                             headers.put(
-                                RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+                                RK_METADATA_KEY,
                                 routingKey
                             );
                             delegate().start(responseListener, headers);
@@ -173,6 +177,7 @@ protected void checkedStart(Listener<RespT> 
responseListener, Metadata headers)
 
     @Override
     protected void doTeardown() {
+        channel.close();
     }
 
     @Test
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
new file mode 100644
index 0000000000..bb94843fe0
--- /dev/null
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import 
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest;
+import 
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse;
+import 
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest;
+import 
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
+import 
org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
+import org.junit.Test;
+
+/**
+ * Unit testing {@link StorageContainerProxyChannelManagerImpl}.
+ */
+public class StorageContainerProxyChannelManagerImplTest extends 
GrpcClientTestBase {
+
+    private final long scId = 1234L;
+    private StorageContainerProxyChannelManagerImpl proxyChannelManager;
+
+    @Override
+    protected void doSetup() throws Exception {
+        this.proxyChannelManager = new 
StorageContainerProxyChannelManagerImpl(serverManager);
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+
+    @Test
+    public void testGetStorageContainerChannel() throws Exception {
+        final CompletableFuture<GetStorageContainerEndpointRequest> 
receivedRequest = new CompletableFuture<>();
+        final CompletableFuture<GetStorageContainerEndpointResponse> 
responseSupplier = new CompletableFuture<>();
+        StorageContainerServiceImplBase scService = new 
StorageContainerServiceImplBase() {
+            @Override
+            public void getStorageContainerEndpoint(
+                    GetStorageContainerEndpointRequest request,
+                    StreamObserver<GetStorageContainerEndpointResponse> 
responseObserver) {
+                receivedRequest.complete(request);
+                try {
+                    responseObserver.onNext(responseSupplier.get());
+                    responseObserver.onCompleted();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    responseObserver.onError(e);
+                } catch (ExecutionException e) {
+                    responseObserver.onError(e);
+                }
+            }
+        };
+        serviceRegistry.addService(scService.bindService());
+
+        Channel channel = proxyChannelManager.getStorageContainerChannel(scId);
+        // if the location service doesn't respond, the channel will be null
+        assertNull(channel);
+
+        // complete the location service request
+        responseSupplier.complete(getResponse(receivedRequest.get()));
+        while ((channel = 
proxyChannelManager.getStorageContainerChannel(scId)) == null) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+        assertNotNull(channel);
+    }
+
+    private static GetStorageContainerEndpointResponse 
getResponse(GetStorageContainerEndpointRequest request) {
+        GetStorageContainerEndpointResponse.Builder respBuilder =
+            GetStorageContainerEndpointResponse.newBuilder();
+        respBuilder.setStatusCode(StatusCode.SUCCESS);
+        for (OneStorageContainerEndpointRequest oneReq : 
request.getRequestsList()) {
+            OneStorageContainerEndpointResponse oneResp = 
OneStorageContainerEndpointResponse.newBuilder()
+                .setEndpoint(StorageContainerEndpoint.newBuilder()
+                    .setStorageContainerId(oneReq.getStorageContainer())
+                    .setRevision(oneReq.getRevision() + 1)
+                    .setRwEndpoint(ENDPOINT))
+                .build();
+            respBuilder.addResponses(oneResp);
+        }
+        return respBuilder.build();
+    }
+
+
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to