sijie closed pull request #1721: [table service][storage] add routing table for
proxying table service requests
URL: https://github.com/apache/bookkeeper/pull/1721
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 76c763138d..e5206de6a3 100644
---
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -58,6 +58,7 @@
private final Optional<String> token;
private final Channel channel;
+ private final StorageServerChannel interceptedServerChannel;
@GuardedBy("this")
private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +88,11 @@ public StorageServerChannel(Endpoint endpoint,
resolvedEndpoint.getPort())
.usePlaintext(usePlainText)
.build();
+ this.interceptedServerChannel = null;
+ }
+
+ public Channel getGrpcChannel() {
+ return channel;
}
@VisibleForTesting
@@ -97,8 +103,15 @@ public StorageServerChannel(ManagedChannel channel,
protected StorageServerChannel(Channel channel,
Optional<String> token) {
+ this(channel, token, null);
+ }
+
+ private StorageServerChannel(Channel channel,
+ Optional<String> token,
+ StorageServerChannel
interceptedServerChannel) {
this.token = token;
this.channel = channel;
+ this.interceptedServerChannel = interceptedServerChannel;
}
public synchronized RootRangeServiceFutureStub getRootRangeService() {
@@ -153,13 +166,18 @@ public StorageServerChannel
intercept(ClientInterceptor... interceptors) {
interceptors);
return new StorageServerChannel(
interceptedChannel,
- this.token);
+ this.token,
+ this);
}
@Override
public void close() {
- if (channel instanceof ManagedChannel) {
- ((ManagedChannel) channel).shutdown();
+ if (interceptedServerChannel != null) {
+ interceptedServerChannel.close();
+ } else {
+ if (channel instanceof ManagedChannel) {
+ ((ManagedChannel) channel).shutdown();
+ }
}
}
}
diff --git
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
index a89aaf7d23..abc9c6d23a 100644
---
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
+++
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
@@ -49,7 +49,7 @@
private ProtocolInternalUtils() {
}
- static HashStreamRanges createActiveRanges(GetActiveRangesResponse
response) {
+ public static HashStreamRanges createActiveRanges(GetActiveRangesResponse
response) {
TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap();
long lastEndKey = Long.MIN_VALUE;
for (RelatedRanges rr : response.getRangesList()) {
diff --git
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
index 1dd5f864aa..e1260665ad 100644
---
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
+++
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
@@ -19,6 +19,9 @@
package org.apache.bookkeeper.clients.impl.channel;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import io.grpc.ManagedChannel;
import io.grpc.Server;
@@ -68,4 +71,13 @@ public void testBasic() {
channel.close();
}
+ @Test
+ public void testIntercept() {
+ ManagedChannel channel = mock(ManagedChannel.class);
+ StorageServerChannel ssChannel = new StorageServerChannel(channel,
Optional.empty());
+ StorageServerChannel interceptedChannel = ssChannel.intercept(1L);
+ interceptedChannel.close();
+ verify(channel, times(1)).shutdown();
+ }
+
}
diff --git
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0088c4b308..b888b45353 100644
---
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -19,6 +19,8 @@
package org.apache.bookkeeper.stream.protocol;
import io.grpc.Metadata;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
import org.apache.bookkeeper.stream.proto.RangeKeyType;
import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -115,4 +117,24 @@ private ProtocolConstants() {
public static final String ROUTING_KEY = "rk" +
Metadata.BINARY_HEADER_SUFFIX;
public static final String STREAM_ID_KEY = "sid-" +
Metadata.BINARY_HEADER_SUFFIX;
public static final String RANGE_ID_KEY = "rid-" +
Metadata.BINARY_HEADER_SUFFIX;
+
+ // the metadata keys in grpc call metadata
+ public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of(
+ SC_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+ RANGE_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+ STREAM_ID_KEY,
+ LongBinaryMarshaller.of()
+ );
+ public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+ ROUTING_KEY,
+ IdentityBinaryMarshaller.of()
+ );
+
+
}
diff --git
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 97ff66ad2b..4385d56e1f 100644
---
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -25,6 +25,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
@@ -54,6 +57,7 @@
import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
import
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
import
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import
org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
import
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
@@ -249,6 +253,10 @@ public static LifecycleComponent
buildStorageServer(CompositeConfiguration conf,
dlConf,
rootStatsLogger.scope("dlog"));
+ // client settings for the proxy channels
+ StorageClientSettings proxyClientSettings =
StorageClientSettings.newBuilder()
+ .serviceUri("bk://localhost:" + grpcPort)
+ .build();
// Create range (stream) store
StorageContainerStoreBuilder storageContainerStoreBuilder =
StorageContainerStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
@@ -286,7 +294,15 @@ public static LifecycleComponent
buildStorageServer(CompositeConfiguration conf,
() -> new DLCheckpointStore(dlNamespaceProvider.get()),
storageConf.getRangeStoreDirs(),
storageResources,
- storageConf.getServeReadOnlyTables()));
+ storageConf.getServeReadOnlyTables()))
+ // with client manager for proxying grpc requests
+ .withStorageServerClientManager(() -> new
StorageServerClientManagerImpl(
+ proxyClientSettings,
+ storageResources.scheduler(),
+ StorageServerChannel.factory(proxyClientSettings)
+ // intercept the channel to attach routing header
+ .andThen(channel -> channel.intercept(new
RoutingHeaderProxyInterceptor()))
+ ));
StorageService storageService = new StorageService(
storageConf, storageContainerStoreBuilder,
rootStatsLogger.scope("storage"));
diff --git
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index 57634d32d1..a05106058e 100644
---
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -35,6 +35,15 @@
*/
StorageContainer getStorageContainer(long storageContainerId);
+ /**
+ * Get the instance of storage container {@code storageContainerId}.
+ *
+ * @param storageContainerId storage container id
+ * @param defaultContainer the default container to return if the
container doesn't exist in the registry
+ * @return the instance of the storage container.
+ */
+ StorageContainer getStorageContainer(long storageContainerId,
StorageContainer defaultContainer);
+
/**
* Start the storage container in this registry.
*
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index 52cef7fb33..a2ce3e0de9 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -66,6 +66,13 @@
<artifactId>stream-storage-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>stream-storage-java-client-base</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index 9c96139dfa..cac363f41e 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -17,6 +17,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.URI;
+import java.util.function.Supplier;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -46,6 +48,7 @@ public static StorageContainerStoreBuilder newBuilder() {
StorageContainerPlacementPolicyImpl.of(1024);
private MVCCStoreFactory mvccStoreFactory = null;
private URI defaultBackendUri = null;
+ private Supplier<StorageServerClientManager> clientManagerSupplier;
private StorageContainerStoreBuilder() {
}
@@ -131,12 +134,25 @@ public StorageContainerStoreBuilder
withDefaultBackendUri(URI uri) {
return this;
}
+ /**
+ * Supplier to provide client manager for proxying requests.
+ *
+ * @param clientManagerSupplier client manager supplier
+ * @return storage container store builder
+ */
+ public StorageContainerStoreBuilder withStorageServerClientManager(
+ Supplier<StorageServerClientManager> clientManagerSupplier) {
+ this.clientManagerSupplier = clientManagerSupplier;
+ return this;
+ }
+
public StorageContainerStore build() {
checkNotNull(scmFactory, "StorageContainerManagerFactory is not
provided");
checkNotNull(storeConf, "StorageConfiguration is not provided");
checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
checkNotNull(defaultBackendUri, "Default backend uri is not provided");
checkNotNull(placementPolicyFactory, "Storage Container Placement
Policy Factory is not provided");
+ checkNotNull(clientManagerSupplier, "Storage server client manager is
not provided");
RangeStoreServiceFactoryImpl serviceFactory = new
RangeStoreServiceFactoryImpl(
storeConf,
@@ -152,6 +168,7 @@ public StorageContainerStore build() {
storeConf,
scmFactory,
containerServiceFactory,
+ clientManagerSupplier.get(),
statsLogger);
}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
index 56fcba2a8d..1271e85e3c 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -15,15 +15,19 @@
package org.apache.bookkeeper.stream.storage.impl;
import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
-import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import java.io.IOException;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
@@ -31,11 +35,15 @@
import
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
import
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl;
+import
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager;
+import
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl;
import
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
import
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
/**
- * KeyRange Service.
+ * Storage Container Store manages the containers and routes the requests
accordingly.
*/
public class StorageContainerStoreImpl
extends AbstractLifecycleComponent<StorageConfiguration>
@@ -45,11 +53,14 @@
private final StorageContainerRegistryImpl scRegistry;
private final StorageContainerManager scManager;
private final StorageContainerServiceFactory serviceFactory;
- private final Metadata.Key<Long> scIdKey;
+ private final StorageServerClientManager ssClientManager;
+ private final RangeRoutingTable routingTable;
+ private final StorageContainerProxyChannelManager proxyChannelManager;
public StorageContainerStoreImpl(StorageConfiguration conf,
StorageContainerManagerFactory
managerFactory,
StorageContainerServiceFactory
serviceFactory,
+ StorageServerClientManager
ssClientManager,
StatsLogger statsLogger) {
super("range-service", conf, statsLogger);
this.scmFactory = managerFactory;
@@ -57,9 +68,14 @@ public StorageContainerStoreImpl(StorageConfiguration conf,
new DefaultStorageContainerFactory(serviceFactory));
this.scManager = scmFactory.create(conf, scRegistry);
this.serviceFactory = serviceFactory;
- this.scIdKey = Metadata.Key.of(
- SC_ID_KEY,
- LongBinaryMarshaller.of());
+ this.ssClientManager = ssClientManager;
+ if (ssClientManager == null) {
+ this.proxyChannelManager = null;
+ this.routingTable = null;
+ } else {
+ this.proxyChannelManager = new
StorageContainerProxyChannelManagerImpl(ssClientManager);
+ this.routingTable = new RangeRoutingTableImpl(ssClientManager);
+ }
}
@Override
@@ -83,6 +99,10 @@ protected void doStart() {
@Override
protected void doStop() {
+ // it doesn't have to be blocked at waiting closing proxy channels to
be completed.
+ if (null != ssClientManager) {
+ this.ssClientManager.closeAsync();
+ }
this.scManager.stop();
this.scRegistry.close();
}
@@ -97,17 +117,69 @@ StorageContainer getStorageContainer(long scId) {
return scRegistry.getStorageContainer(scId);
}
+ StorageContainer getStorageContainer(long scId, StorageContainer
defaultContainer) {
+ return scRegistry.getStorageContainer(scId, defaultContainer);
+ }
+
//
// Utils for proxies
//
@Override
public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
- Long scId = headers.get(scIdKey);
- if (null == scId) {
- // use the invalid storage container id, so it will fail the
request.
- scId = INVALID_STORAGE_CONTAINER_ID;
+ Long scId = headers.get(SCID_METADATA_KEY);
+ if (null != scId) {
+ // if a request is sent directly to a container, then find the
container
+ StorageContainer container = getStorageContainer(scId, null);
+ if (null != container) {
+ return container.getChannel();
+ } else {
+ if (scId == 0L && null != proxyChannelManager) {
+ // root container, we attempt to always proxy the requests
for root container
+ Channel channel =
proxyChannelManager.getStorageContainerChannel(0L);
+ if (null != channel) {
+ return channel;
+ } else {
+ // no channel found to proxy the request, fail the
request with 404 container
+ return
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+ }
+ } else {
+ // no container is found and the scId is not the root
container
+ // then fail the request with 404 container
+ return
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+ }
+ }
+ } else {
+ // if a request is not sent directly to a container, then check if
+ // streamId + routingKey is attached in the header. if so, figure
out
+ // which the range id and storage container id to route the
request.
+ byte[] routingKey = headers.get(RK_METADATA_KEY);
+ Long streamId = headers.get(SID_METADATA_KEY);
+ if (null != routingKey && null != streamId && null != routingTable
&& null != proxyChannelManager) {
+ RangeProperties rangeProps = routingTable.getRange(streamId,
routingKey);
+ if (null != rangeProps) {
+ long containerId = rangeProps.getStorageContainerId();
+ long rangeId = rangeProps.getRangeId();
+ // if we find the routing information, we can update the
headers
+ headers.put(SCID_METADATA_KEY, containerId);
+ headers.put(RID_METADATA_KEY, rangeId);
+ // if we find the container id, we can check whether the
container is owned locally.
+ // if so, forward the request to the container.
+ StorageContainer container =
getStorageContainer(containerId, null);
+ if (null == container) {
+ // the container doesn't belong to here, then find the
channel to forward the request
+ Channel channel =
proxyChannelManager.getStorageContainerChannel(containerId);
+ if (null != channel) {
+ return channel;
+ }
+ } else {
+ // we found the container exists in the registry to
serve the request
+ return container.getChannel();
+ }
+ }
+ }
+ // there is no storage container id or routing key, then fail the
request and ask client to retry.
+ return
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
}
- return getStorageContainer(scId).getChannel();
}
}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
new file mode 100644
index 0000000000..bb80142149
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+
+/**
+ * A routing table for figuring the ranges to forward the requests.
+ */
+public interface RangeRoutingTable {
+
+ /**
+ * Get the range metadata for <tt>streamId</tt> and the routing key
<tt>routingKey</tt>.
+ *
+ * @param streamId stream id
+ * @param routingKey routing key
+ * @return the range that serves this routing key
+ */
+ RangeProperties getRange(long streamId, byte[] routingKey);
+
+}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
new file mode 100644
index 0000000000..b9727c9037
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A default implementation of {@link RangeRoutingTable}.
+ */
+@Slf4j
+public class RangeRoutingTableImpl implements RangeRoutingTable {
+
+ private final StorageServerClientManager manager;
+ private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges;
+
+ // outstanding requests
+ private final
ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>>
outstandingUpdates;
+
+ public RangeRoutingTableImpl(StorageServerClientManager manager) {
+ this.manager = manager;
+ this.ranges = new ConcurrentLongHashMap<>();
+ this.outstandingUpdates = new ConcurrentLongHashMap<>();
+ }
+
+ @VisibleForTesting
+ RangeRouter<byte[]> getRangeRouter(long streamId) {
+ return ranges.get(streamId);
+ }
+
+ @Override
+ public RangeProperties getRange(long streamId, byte[] routingKey) {
+ RangeRouter<byte[]> router = ranges.get(streamId);
+ if (null == router) {
+ // trigger to fetch stream metadata, but return `null` since
+ // the range router is not ready, let the client backoff and retry.
+ fetchStreamRanges(streamId);
+ return null;
+ } else {
+ return router.getRangeProperties(routingKey);
+ }
+ }
+
+ @VisibleForTesting
+ CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long
streamId) {
+ return outstandingUpdates.get(streamId);
+ }
+
+ private void fetchStreamRanges(long streamId) {
+ if (null != outstandingUpdates.get(streamId)) {
+ // there is already an outstanding fetch request, do nothing
+ return;
+ }
+ final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new
CompletableFuture<>();
+ if (null != outstandingUpdates.put(streamId, newFetchFuture)) {
+ // some one already triggers the fetch
+ return;
+ }
+ FutureUtils.proxyTo(
+ manager.openMetaRangeClient(streamId)
+ .thenCompose(metaRangeClient ->
metaRangeClient.getActiveDataRanges())
+ .thenApply(hashStreamRanges -> {
+ RangeRouter<byte[]> router = new
RangeRouter<>(BytesHashRouter.of());
+ router.setRanges(hashStreamRanges);
+ return router;
+ })
+ .whenComplete((router, cause) -> {
+ if (null == cause) {
+ ranges.put(streamId, router);
+ }
+ outstandingUpdates.remove(streamId, newFetchFuture);
+ }),
+ newFetchFuture
+ );
+ }
+}
diff --git
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
similarity index 70%
rename from
stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
rename to
stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
index 2ef970f92f..377f6df2c6 100644
---
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
-import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
-import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
-import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
@@ -34,6 +34,9 @@
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -41,8 +44,6 @@
import java.util.Map;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
@@ -50,25 +51,13 @@
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.commons.codec.binary.Hex;
/**
* A client interceptor that intercepting kv rpcs to attach routing
information.
*/
@Slf4j
-public class RoutingHeaderClientInterceptor implements ClientInterceptor {
-
- static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
- RANGE_ID_KEY,
- LongBinaryMarshaller.of()
- );
- static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
- STREAM_ID_KEY,
- LongBinaryMarshaller.of()
- );
- static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
- ROUTING_KEY,
- IdentityBinaryMarshaller.of()
- );
+public class RoutingHeaderProxyInterceptor implements ClientInterceptor {
/**
* Table request mutator that mutates a table service rpc request to attach
@@ -175,53 +164,75 @@ ReqT intercept(ReqT request,
CallOptions
callOptions,
Channel next) {
if (log.isTraceEnabled()) {
- log.trace("Intercepting method {}", method.getFullMethodName());
+ log.trace("Intercepting method {} : req marshaller = {}, resp
marshaller = {}",
+ method.getFullMethodName(),
+ method.getRequestMarshaller(),
+ method.getResponseMarshaller());
}
InterceptorDescriptor<?> descriptor =
kvRpcMethods.get(method.getFullMethodName());
- if (null != descriptor) {
- return new SimpleForwardingClientCall<ReqT,
RespT>(next.newCall(method, callOptions)) {
-
- private Long rid = null;
- private Long sid = null;
- private byte[] rk = null;
+ return new SimpleForwardingClientCall<ReqT,
RespT>(next.newCall(method, callOptions)) {
- @Override
- public void start(Listener<RespT> responseListener, Metadata
headers) {
- // capture routing information from headers
- sid = headers.get(SID_METADATA_KEY);
- rid = headers.get(RID_METADATA_KEY);
- rk = headers.get(RK_METADATA_KEY);
- if (log.isTraceEnabled()) {
- log.trace("Intercepting request with header : sid =
{}, rid = {}, rk = {}",
- sid, rid, rk);
- }
+ private Long rid = null;
+ private Long sid = null;
+ private byte[] rk = null;
- delegate().start(responseListener, headers);
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata
headers) {
+ // capture routing information from headers
+ sid = headers.get(SID_METADATA_KEY);
+ rid = headers.get(RID_METADATA_KEY);
+ rk = headers.get(RK_METADATA_KEY);
+ if (log.isTraceEnabled()) {
+ log.trace("Intercepting request with header : sid = {},
rid = {}, rk = {}",
+ sid, rid, rk);
}
- @Override
- public void sendMessage(ReqT message) {
- ReqT interceptedMessage;
- if (null == rid || null == sid || null == rk) {
- // we don't have enough information to form the new
routing header
- // so do nothing
- interceptedMessage = message;
- } else {
- interceptedMessage = interceptMessage(
- method,
- descriptor,
- message,
- sid,
- rid,
- rk
- );
- }
- delegate().sendMessage(interceptedMessage);
+ delegate().start(responseListener, headers);
+ }
+
+ @Override
+ public void sendMessage(ReqT message) {
+ ReqT interceptedMessage;
+ if (null == rid || null == sid || null == rk || null ==
descriptor) {
+ // we don't have enough information to form the new
routing header
+ // we simply copy the bytes and generate a new message to
forward
+ // the request payload
+ interceptedMessage = interceptMessage(method, message);
+ } else {
+ interceptedMessage = interceptMessage(
+ method,
+ descriptor,
+ message,
+ sid,
+ rid,
+ rk
+ );
}
- };
- } else {
- return next.newCall(method, callOptions);
+ delegate().sendMessage(interceptedMessage);
+ }
+ };
+ }
+
+ private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT>
method, ReqT message) {
+ InputStream is = method.getRequestMarshaller().stream(message);
+ int bytes;
+ try {
+ bytes = is.available();
+ } catch (IOException e) {
+ log.warn("Encountered exceptions in getting available bytes of
message", e);
+ throw new RuntimeException("Encountered exception in intercepting
message", e);
+ }
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
+ try {
+ buffer.writeBytes(is, bytes);
+ } catch (IOException e) {
+ log.warn("Encountered exceptions in transferring bytes to the
buffer", e);
+ buffer.release();
+ throw new RuntimeException("Encountered exceptions in transferring
bytes to the buffer", e);
}
+ return method
+ .getRequestMarshaller()
+ .parse(new ByteBufInputStream(buffer, true));
}
private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
@@ -237,7 +248,9 @@ public void sendMessage(ReqT message) {
} else {
try {
return interceptTableRequest(method, descriptor, message, sid,
rid, rk);
- } catch (IOException ioe) {
+ } catch (Throwable t) {
+ log.error("Failed to intercept table request (sid = {}, rid =
{}, rk = {}) : ",
+ sid, rid, Hex.encodeHexString(rk), t);
return message;
}
}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
new file mode 100644
index 0000000000..deb054e150
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+
+/**
+ * A manager that manages all the proxy channels to other storage containers.
+ */
+public interface StorageContainerProxyChannelManager {
+
+ /**
+ * Get the channel to storage container <tt>scId</tt>.
+ *
+ * <p>The method can return `null` if the channel is not ready.
+ *
+ * @param scId storage container id
+ * @return channel to the given storage container, or `null` if it can't
connect
+ */
+ Channel getStorageContainerChannel(long scId);
+
+}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
new file mode 100644
index 0000000000..ed69e0f040
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+
+/**
+ * Default implementation of {@link StorageContainerProxyChannelManager}.
+ */
+public class StorageContainerProxyChannelManagerImpl implements
StorageContainerProxyChannelManager {
+
+ // we can ideally just talk to the location service directly.
+ // however currently storage container is separated from actual services
to make a clean interface
+ // so for now proxy channel manager will be acting as a client to talk to
its local server to
+ // get location related information
+ private final StorageServerClientManager ssClientManager;
+
+ public StorageContainerProxyChannelManagerImpl(StorageServerClientManager
clientManager) {
+ this.ssClientManager = clientManager;
+ }
+
+ @Override
+ public Channel getStorageContainerChannel(long scId) {
+ StorageContainerChannel channel =
ssClientManager.getStorageContainerChannel(scId);
+ // this will trigger creating the channel for the first time
+ CompletableFuture<StorageServerChannel> serverChannelFuture =
channel.getStorageContainerChannelFuture();
+ if (null != serverChannelFuture && serverChannelFuture.isDone()) {
+ StorageServerChannel serverChannel = serverChannelFuture.join();
+ if (serverChannel != null) {
+ return serverChannel.getGrpcChannel();
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
new file mode 100644
index 0000000000..5d4437ca36
--- /dev/null
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes related to grpc request routing.
+ */
+package org.apache.bookkeeper.stream.storage.impl.routing;
\ No newline at end of file
diff --git
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 48330b7e13..160d9a8647 100644
---
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -58,7 +58,12 @@ public int getNumStorageContainers() {
@Override
public StorageContainer getStorageContainer(long storageContainerId) {
- return containers.getOrDefault(storageContainerId,
StorageContainer404.of());
+ return getStorageContainer(storageContainerId,
StorageContainer404.of());
+ }
+
+ @Override
+ public StorageContainer getStorageContainer(long storageContainerId,
StorageContainer defaultContainer) {
+ return containers.getOrDefault(storageContainerId, defaultContainer);
}
@Override
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 3d3a0ad1b5..59d6a1306b 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,6 +18,7 @@
import static org.mockito.Mockito.mock;
import java.net.URI;
+import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
import
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -47,6 +48,7 @@ public void testBuildNullConfiguration() {
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
.withDefaultBackendUri(uri)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.build();
}
@@ -57,6 +59,7 @@ public void testBuildNullResources() {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(null)
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -68,6 +71,7 @@ public void testBuildNullRGManagerFactory() {
.withStorageContainerManagerFactory(null)
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -79,6 +83,7 @@ public void testBuildNullStoreFactory() {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(null)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
}
@@ -90,10 +95,23 @@ public void testBuildNullDefaultBackendUri() {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.withDefaultBackendUri(null)
.build();
}
+ @Test(expected = NullPointerException.class)
+ public void testBuildStorageServerClientManager() {
+ StorageContainerStoreBuilder.newBuilder()
+ .withStorageConfiguration(mock(StorageConfiguration.class))
+
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
+ .withStorageResources(StorageResources.create())
+ .withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(null)
+ .withDefaultBackendUri(uri)
+ .build();
+ }
+
@Test
public void testBuild() {
StorageContainerStore storageContainerStore =
StorageContainerStoreBuilder.newBuilder()
@@ -101,6 +119,7 @@ public void testBuild() {
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
.withStorageResources(StorageResources.create())
.withRangeStoreFactory(storeFactory)
+ .withStorageServerClientManager(() ->
mock(StorageServerClientManager.class))
.withDefaultBackendUri(uri)
.build();
assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index b9d5e69798..6ae3658dad 100644
---
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -85,6 +85,7 @@
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import
org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -133,6 +134,7 @@ private static Endpoint createEndpoint(String hostname,
NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build();
+ private final StorageResources resources = StorageResources.create();
private RangeStoreService mockRangeStoreService;
private StorageContainerStoreImpl rangeStore;
private Server server;
@@ -230,6 +232,7 @@ public void setUp() throws Exception {
(storeConf, rgRegistry)
-> new LocalStorageContainerManager(endpoint, storeConf,
rgRegistry, 2),
new
RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+ null,
NullStatsLogger.INSTANCE);
rangeStore.start();
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
new file mode 100644
index 0000000000..8201ba714d
--- /dev/null
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import
org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
+import org.apache.bookkeeper.stream.proto.storage.RelationType;
+import
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
+import
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RangeRoutingTable}.
+ */
+public class RangeRoutingTableImplTest extends GrpcClientTestBase {
+
+ private final long scId = 1234L;
+ private final long streamId = 123456L;
+ private GetActiveRangesResponse getActiveRangesResponse;
+ private CompletableFuture<GetActiveRangesResponse> responseSupplier;
+ private StreamProperties props;
+ private List<RangeProperties> rangeProps;
+ private RangeRoutingTableImpl routingTable;
+ private RangeRouter<byte[]> rangeRouter;
+
+ @Override
+ protected void doSetup() throws Exception {
+ this.props = StreamProperties.newBuilder()
+ .setStorageContainerId(scId)
+ .setStreamId(streamId)
+ .setStreamName("metaclient-stream")
+ .setStreamConf(StreamConfiguration.newBuilder().build())
+ .build();
+ this.rangeProps = ProtoUtils.split(
+ streamId,
+ 24,
+ 23456L,
+ StorageContainerPlacementPolicyImpl.of(4)
+ );
+ final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder =
GetActiveRangesResponse.newBuilder();
+ for (RangeProperties range : rangeProps) {
+ RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder()
+ .setProps(range)
+ .setType(RelationType.PARENTS)
+ .addAllRelatedRanges(Collections.emptyList());
+ getActiveRangesResponseBuilder.addRanges(rrBuilder);
+ }
+ this.getActiveRangesResponse = getActiveRangesResponseBuilder
+ .setCode(StatusCode.SUCCESS)
+ .build();
+ RootRangeServiceImplBase rootRangeService = new
RootRangeServiceImplBase() {
+ @Override
+ public void getStream(GetStreamRequest request,
+ StreamObserver<GetStreamResponse>
responseObserver) {
+ responseObserver.onNext(GetStreamResponse.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setStreamProps(props)
+ .build());
+ responseObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(rootRangeService);
+
+ this.responseSupplier = new CompletableFuture<>();
+ // register a good meta range service
+ MetaRangeServiceImplBase metaRangeService = new
MetaRangeServiceImplBase() {
+ @Override
+ public void getActiveRanges(GetActiveRangesRequest request,
+
StreamObserver<GetActiveRangesResponse> responseObserver) {
+ try {
+ responseObserver.onNext(responseSupplier.get());
+ responseObserver.onCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ responseObserver.onError(e);
+ } catch (ExecutionException e) {
+ responseObserver.onError(e);
+ }
+ }
+ };
+ serviceRegistry.addService(metaRangeService);
+
+ this.routingTable = new RangeRoutingTableImpl(serverManager);
+ this.rangeRouter = new RangeRouter<>(BytesHashRouter.of());
+
this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse));
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ @Test
+ public void testGetRange() throws Exception {
+ String key = "foo";
+ byte[] keyBytes = key.getBytes(UTF_8);
+ RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+ // the first get will return null since there is nothing in
+ assertNull(rangeProps);
+ // the fetch request is outstanding
+ CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+ routingTable.getOutstandingFetchRequest(streamId);
+ assertNotNull(outstandingFetchFuture);
+ assertFalse(outstandingFetchFuture.isDone());
+
+ // complete the response supplier, so the fetch request can complete
to update the cache
+ responseSupplier.complete(getActiveRangesResponse);
+
+ // wait until the stuff is cached.
+ while (null == routingTable.getRangeRouter(streamId)) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ // if the router is created, it should return the cached router
+ rangeProps = routingTable.getRange(streamId, keyBytes);
+ assertNotNull(rangeProps);
+ assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps);
+ }
+
+ @Test
+ public void testGetRangeException() throws Exception {
+ String key = "foo";
+ byte[] keyBytes = key.getBytes(UTF_8);
+ RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+ // the first get will return null since there is nothing in
+ assertNull(rangeProps);
+ // the fetch request is outstanding
+ CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+ routingTable.getOutstandingFetchRequest(streamId);
+ assertNotNull(outstandingFetchFuture);
+ assertFalse(outstandingFetchFuture.isDone());
+
+ // complete the response supplier, so the fetch request can complete
to update the cache
+ responseSupplier.completeExceptionally(new Exception("fetch failed"));
+
+ // wait until the fetch is done.
+ try {
+ outstandingFetchFuture.get();
+ fail("Fetch request should fail");
+ } catch (Exception e) {
+ // expected
+ }
+
+ // once the fetch is done, nothing should be cached and the
outstanding fetch request should be removed
+ assertNull(routingTable.getRangeRouter(streamId));
+ assertNull(routingTable.getOutstandingFetchRequest(streamId));
+ }
+
+}
diff --git
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
similarity index 94%
rename from
stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
rename to
stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
index 745118b049..43c060b12c 100644
---
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
@@ -17,9 +17,12 @@
* under the License.
*/
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
@@ -54,10 +57,10 @@
import org.junit.Test;
/**
- * Unit test {@link RoutingHeaderClientInterceptor}.
+ * Unit test {@link RoutingHeaderProxyInterceptor}.
*/
@Slf4j
-public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase {
private final long streamId = 1234L;
private final long rangeId = 2345L;
@@ -136,11 +139,12 @@ public void put(PutRequest request,
StreamObserver<PutResponse> responseObserver
};
serviceRegistry.addService(tableService.bindService());
+
this.channel = new StorageServerChannel(
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
Optional.empty()
).intercept(
- new RoutingHeaderClientInterceptor(),
+ new RoutingHeaderProxyInterceptor(),
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method,
@@ -152,15 +156,15 @@ protected void checkedStart(Listener<RespT>
responseListener, Metadata headers)
log.info("Intercept the request with routing
information : sid = {}, rid = {}, rk = {}",
streamId, rangeId, new String(routingKey,
UTF_8));
headers.put(
-
RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+ RID_METADATA_KEY,
rangeId
);
headers.put(
-
RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+ SID_METADATA_KEY,
streamId
);
headers.put(
- RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+ RK_METADATA_KEY,
routingKey
);
delegate().start(responseListener, headers);
@@ -173,6 +177,7 @@ protected void checkedStart(Listener<RespT>
responseListener, Metadata headers)
@Override
protected void doTeardown() {
+ channel.close();
}
@Test
diff --git
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
new file mode 100644
index 0000000000..bb94843fe0
--- /dev/null
+++
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest;
+import
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse;
+import
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest;
+import
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
+import
org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
+import org.junit.Test;
+
+/**
+ * Unit testing {@link StorageContainerProxyChannelManagerImpl}.
+ */
+public class StorageContainerProxyChannelManagerImplTest extends
GrpcClientTestBase {
+
+ private final long scId = 1234L;
+ private StorageContainerProxyChannelManagerImpl proxyChannelManager;
+
+ @Override
+ protected void doSetup() throws Exception {
+ this.proxyChannelManager = new
StorageContainerProxyChannelManagerImpl(serverManager);
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+
+ @Test
+ public void testGetStorageContainerChannel() throws Exception {
+ final CompletableFuture<GetStorageContainerEndpointRequest>
receivedRequest = new CompletableFuture<>();
+ final CompletableFuture<GetStorageContainerEndpointResponse>
responseSupplier = new CompletableFuture<>();
+ StorageContainerServiceImplBase scService = new
StorageContainerServiceImplBase() {
+ @Override
+ public void getStorageContainerEndpoint(
+ GetStorageContainerEndpointRequest request,
+ StreamObserver<GetStorageContainerEndpointResponse>
responseObserver) {
+ receivedRequest.complete(request);
+ try {
+ responseObserver.onNext(responseSupplier.get());
+ responseObserver.onCompleted();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ responseObserver.onError(e);
+ } catch (ExecutionException e) {
+ responseObserver.onError(e);
+ }
+ }
+ };
+ serviceRegistry.addService(scService.bindService());
+
+ Channel channel = proxyChannelManager.getStorageContainerChannel(scId);
+ // if the location service doesn't respond, the channel will be null
+ assertNull(channel);
+
+ // complete the location service request
+ responseSupplier.complete(getResponse(receivedRequest.get()));
+ while ((channel =
proxyChannelManager.getStorageContainerChannel(scId)) == null) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ assertNotNull(channel);
+ }
+
+ private static GetStorageContainerEndpointResponse
getResponse(GetStorageContainerEndpointRequest request) {
+ GetStorageContainerEndpointResponse.Builder respBuilder =
+ GetStorageContainerEndpointResponse.newBuilder();
+ respBuilder.setStatusCode(StatusCode.SUCCESS);
+ for (OneStorageContainerEndpointRequest oneReq :
request.getRequestsList()) {
+ OneStorageContainerEndpointResponse oneResp =
OneStorageContainerEndpointResponse.newBuilder()
+ .setEndpoint(StorageContainerEndpoint.newBuilder()
+ .setStorageContainerId(oneReq.getStorageContainer())
+ .setRevision(oneReq.getRevision() + 1)
+ .setRwEndpoint(ENDPOINT))
+ .build();
+ respBuilder.addResponses(oneResp);
+ }
+ return respBuilder.build();
+ }
+
+
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services