sijie closed pull request #1721: [table service][storage] add routing table for proxying table service requests URL: https://github.com/apache/bookkeeper/pull/1721
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java index 76c763138d..e5206de6a3 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java @@ -58,6 +58,7 @@ private final Optional<String> token; private final Channel channel; + private final StorageServerChannel interceptedServerChannel; @GuardedBy("this") private RootRangeServiceFutureStub rootRangeService; @@ -87,6 +88,11 @@ public StorageServerChannel(Endpoint endpoint, resolvedEndpoint.getPort()) .usePlaintext(usePlainText) .build(); + this.interceptedServerChannel = null; + } + + public Channel getGrpcChannel() { + return channel; } @VisibleForTesting @@ -97,8 +103,15 @@ public StorageServerChannel(ManagedChannel channel, protected StorageServerChannel(Channel channel, Optional<String> token) { + this(channel, token, null); + } + + private StorageServerChannel(Channel channel, + Optional<String> token, + StorageServerChannel interceptedServerChannel) { this.token = token; this.channel = channel; + this.interceptedServerChannel = interceptedServerChannel; } public synchronized RootRangeServiceFutureStub getRootRangeService() { @@ -153,13 +166,18 @@ public StorageServerChannel intercept(ClientInterceptor... interceptors) { interceptors); return new StorageServerChannel( interceptedChannel, - this.token); + this.token, + this); } @Override public void close() { - if (channel instanceof ManagedChannel) { - ((ManagedChannel) channel).shutdown(); + if (interceptedServerChannel != null) { + interceptedServerChannel.close(); + } else { + if (channel instanceof ManagedChannel) { + ((ManagedChannel) channel).shutdown(); + } } } } diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java index a89aaf7d23..abc9c6d23a 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java @@ -49,7 +49,7 @@ private ProtocolInternalUtils() { } - static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { + public static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap(); long lastEndKey = Long.MIN_VALUE; for (RelatedRanges rr : response.getRangesList()) { diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java index 1dd5f864aa..e1260665ad 100644 --- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java +++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java @@ -19,6 +19,9 @@ package org.apache.bookkeeper.clients.impl.channel; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.grpc.ManagedChannel; import io.grpc.Server; @@ -68,4 +71,13 @@ public void testBasic() { channel.close(); } + @Test + public void testIntercept() { + ManagedChannel channel = mock(ManagedChannel.class); + StorageServerChannel ssChannel = new StorageServerChannel(channel, Optional.empty()); + StorageServerChannel interceptedChannel = ssChannel.intercept(1L); + interceptedChannel.close(); + verify(channel, times(1)).shutdown(); + } + } diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java index 0088c4b308..b888b45353 100644 --- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java +++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.stream.protocol; import io.grpc.Metadata; +import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller; +import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy; import org.apache.bookkeeper.stream.proto.RangeKeyType; import org.apache.bookkeeper.stream.proto.RetentionPolicy; @@ -115,4 +117,24 @@ private ProtocolConstants() { public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX; public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX; public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX; + + // the metadata keys in grpc call metadata + public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of( + SC_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of( + RANGE_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of( + STREAM_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of( + ROUTING_KEY, + IdentityBinaryMarshaller.of() + ); + + } diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java index 97ff66ad2b..4385d56e1f 100644 --- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java +++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java @@ -25,6 +25,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.clients.config.StorageClientSettings; +import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel; +import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl; import org.apache.bookkeeper.common.component.ComponentStarter; import org.apache.bookkeeper.common.component.LifecycleComponent; import org.apache.bookkeeper.common.component.LifecycleComponentStack; @@ -54,6 +57,7 @@ import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore; +import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController; import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager; @@ -249,6 +253,10 @@ public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, dlConf, rootStatsLogger.scope("dlog")); + // client settings for the proxy channels + StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder() + .serviceUri("bk://localhost:" + grpcPort) + .build(); // Create range (stream) store StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder() .withStatsLogger(rootStatsLogger.scope("storage")) @@ -286,7 +294,15 @@ public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, () -> new DLCheckpointStore(dlNamespaceProvider.get()), storageConf.getRangeStoreDirs(), storageResources, - storageConf.getServeReadOnlyTables())); + storageConf.getServeReadOnlyTables())) + // with client manager for proxying grpc requests + .withStorageServerClientManager(() -> new StorageServerClientManagerImpl( + proxyClientSettings, + storageResources.scheduler(), + StorageServerChannel.factory(proxyClientSettings) + // intercept the channel to attach routing header + .andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor())) + )); StorageService storageService = new StorageService( storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage")); diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java index 57634d32d1..a05106058e 100644 --- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java +++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java @@ -35,6 +35,15 @@ */ StorageContainer getStorageContainer(long storageContainerId); + /** + * Get the instance of storage container {@code storageContainerId}. + * + * @param storageContainerId storage container id + * @param defaultContainer the default container to return if the container doesn't exist in the registry + * @return the instance of the storage container. + */ + StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer); + /** * Start the storage container in this registry. * diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml index 52cef7fb33..a2ce3e0de9 100644 --- a/stream/storage/impl/pom.xml +++ b/stream/storage/impl/pom.xml @@ -66,6 +66,13 @@ <artifactId>stream-storage-tests-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.bookkeeper</groupId> + <artifactId>stream-storage-java-client-base</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java index 9c96139dfa..cac363f41e 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java @@ -17,6 +17,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.net.URI; +import java.util.function.Supplier; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy; @@ -46,6 +48,7 @@ public static StorageContainerStoreBuilder newBuilder() { StorageContainerPlacementPolicyImpl.of(1024); private MVCCStoreFactory mvccStoreFactory = null; private URI defaultBackendUri = null; + private Supplier<StorageServerClientManager> clientManagerSupplier; private StorageContainerStoreBuilder() { } @@ -131,12 +134,25 @@ public StorageContainerStoreBuilder withDefaultBackendUri(URI uri) { return this; } + /** + * Supplier to provide client manager for proxying requests. + * + * @param clientManagerSupplier client manager supplier + * @return storage container store builder + */ + public StorageContainerStoreBuilder withStorageServerClientManager( + Supplier<StorageServerClientManager> clientManagerSupplier) { + this.clientManagerSupplier = clientManagerSupplier; + return this; + } + public StorageContainerStore build() { checkNotNull(scmFactory, "StorageContainerManagerFactory is not provided"); checkNotNull(storeConf, "StorageConfiguration is not provided"); checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided"); checkNotNull(defaultBackendUri, "Default backend uri is not provided"); checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided"); + checkNotNull(clientManagerSupplier, "Storage server client manager is not provided"); RangeStoreServiceFactoryImpl serviceFactory = new RangeStoreServiceFactoryImpl( storeConf, @@ -152,6 +168,7 @@ public StorageContainerStore build() { storeConf, scmFactory, containerServiceFactory, + clientManagerSupplier.get(), statsLogger); } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java index 56fcba2a8d..1271e85e3c 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java @@ -15,15 +15,19 @@ package org.apache.bookkeeper.stream.storage.impl; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import io.grpc.Channel; import io.grpc.Metadata; import io.grpc.ServerCall; import java.io.IOException; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.common.component.AbstractLifecycleComponent; -import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stream.proto.RangeProperties; import org.apache.bookkeeper.stream.storage.api.StorageContainerStore; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager; @@ -31,11 +35,15 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; +import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable; +import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl; +import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager; +import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory; import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl; /** - * KeyRange Service. + * Storage Container Store manages the containers and routes the requests accordingly. */ public class StorageContainerStoreImpl extends AbstractLifecycleComponent<StorageConfiguration> @@ -45,11 +53,14 @@ private final StorageContainerRegistryImpl scRegistry; private final StorageContainerManager scManager; private final StorageContainerServiceFactory serviceFactory; - private final Metadata.Key<Long> scIdKey; + private final StorageServerClientManager ssClientManager; + private final RangeRoutingTable routingTable; + private final StorageContainerProxyChannelManager proxyChannelManager; public StorageContainerStoreImpl(StorageConfiguration conf, StorageContainerManagerFactory managerFactory, StorageContainerServiceFactory serviceFactory, + StorageServerClientManager ssClientManager, StatsLogger statsLogger) { super("range-service", conf, statsLogger); this.scmFactory = managerFactory; @@ -57,9 +68,14 @@ public StorageContainerStoreImpl(StorageConfiguration conf, new DefaultStorageContainerFactory(serviceFactory)); this.scManager = scmFactory.create(conf, scRegistry); this.serviceFactory = serviceFactory; - this.scIdKey = Metadata.Key.of( - SC_ID_KEY, - LongBinaryMarshaller.of()); + this.ssClientManager = ssClientManager; + if (ssClientManager == null) { + this.proxyChannelManager = null; + this.routingTable = null; + } else { + this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(ssClientManager); + this.routingTable = new RangeRoutingTableImpl(ssClientManager); + } } @Override @@ -83,6 +99,10 @@ protected void doStart() { @Override protected void doStop() { + // it doesn't have to be blocked at waiting closing proxy channels to be completed. + if (null != ssClientManager) { + this.ssClientManager.closeAsync(); + } this.scManager.stop(); this.scRegistry.close(); } @@ -97,17 +117,69 @@ StorageContainer getStorageContainer(long scId) { return scRegistry.getStorageContainer(scId); } + StorageContainer getStorageContainer(long scId, StorageContainer defaultContainer) { + return scRegistry.getStorageContainer(scId, defaultContainer); + } + // // Utils for proxies // @Override public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) { - Long scId = headers.get(scIdKey); - if (null == scId) { - // use the invalid storage container id, so it will fail the request. - scId = INVALID_STORAGE_CONTAINER_ID; + Long scId = headers.get(SCID_METADATA_KEY); + if (null != scId) { + // if a request is sent directly to a container, then find the container + StorageContainer container = getStorageContainer(scId, null); + if (null != container) { + return container.getChannel(); + } else { + if (scId == 0L && null != proxyChannelManager) { + // root container, we attempt to always proxy the requests for root container + Channel channel = proxyChannelManager.getStorageContainerChannel(0L); + if (null != channel) { + return channel; + } else { + // no channel found to proxy the request, fail the request with 404 container + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); + } + } else { + // no container is found and the scId is not the root container + // then fail the request with 404 container + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); + } + } + } else { + // if a request is not sent directly to a container, then check if + // streamId + routingKey is attached in the header. if so, figure out + // which the range id and storage container id to route the request. + byte[] routingKey = headers.get(RK_METADATA_KEY); + Long streamId = headers.get(SID_METADATA_KEY); + if (null != routingKey && null != streamId && null != routingTable && null != proxyChannelManager) { + RangeProperties rangeProps = routingTable.getRange(streamId, routingKey); + if (null != rangeProps) { + long containerId = rangeProps.getStorageContainerId(); + long rangeId = rangeProps.getRangeId(); + // if we find the routing information, we can update the headers + headers.put(SCID_METADATA_KEY, containerId); + headers.put(RID_METADATA_KEY, rangeId); + // if we find the container id, we can check whether the container is owned locally. + // if so, forward the request to the container. + StorageContainer container = getStorageContainer(containerId, null); + if (null == container) { + // the container doesn't belong to here, then find the channel to forward the request + Channel channel = proxyChannelManager.getStorageContainerChannel(containerId); + if (null != channel) { + return channel; + } + } else { + // we found the container exists in the registry to serve the request + return container.getChannel(); + } + } + } + // there is no storage container id or routing key, then fail the request and ask client to retry. + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); } - return getStorageContainer(scId).getChannel(); } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java new file mode 100644 index 0000000000..bb80142149 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import org.apache.bookkeeper.stream.proto.RangeProperties; + +/** + * A routing table for figuring the ranges to forward the requests. + */ +public interface RangeRoutingTable { + + /** + * Get the range metadata for <tt>streamId</tt> and the routing key <tt>routingKey</tt>. + * + * @param streamId stream id + * @param routingKey routing key + * @return the range that serves this routing key + */ + RangeProperties getRange(long streamId, byte[] routingKey); + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java new file mode 100644 index 0000000000..b9727c9037 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; +import org.apache.bookkeeper.clients.impl.routing.RangeRouter; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.router.BytesHashRouter; +import org.apache.bookkeeper.stream.proto.RangeProperties; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; + +/** + * A default implementation of {@link RangeRoutingTable}. + */ +@Slf4j +public class RangeRoutingTableImpl implements RangeRoutingTable { + + private final StorageServerClientManager manager; + private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges; + + // outstanding requests + private final ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> outstandingUpdates; + + public RangeRoutingTableImpl(StorageServerClientManager manager) { + this.manager = manager; + this.ranges = new ConcurrentLongHashMap<>(); + this.outstandingUpdates = new ConcurrentLongHashMap<>(); + } + + @VisibleForTesting + RangeRouter<byte[]> getRangeRouter(long streamId) { + return ranges.get(streamId); + } + + @Override + public RangeProperties getRange(long streamId, byte[] routingKey) { + RangeRouter<byte[]> router = ranges.get(streamId); + if (null == router) { + // trigger to fetch stream metadata, but return `null` since + // the range router is not ready, let the client backoff and retry. + fetchStreamRanges(streamId); + return null; + } else { + return router.getRangeProperties(routingKey); + } + } + + @VisibleForTesting + CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long streamId) { + return outstandingUpdates.get(streamId); + } + + private void fetchStreamRanges(long streamId) { + if (null != outstandingUpdates.get(streamId)) { + // there is already an outstanding fetch request, do nothing + return; + } + final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new CompletableFuture<>(); + if (null != outstandingUpdates.put(streamId, newFetchFuture)) { + // some one already triggers the fetch + return; + } + FutureUtils.proxyTo( + manager.openMetaRangeClient(streamId) + .thenCompose(metaRangeClient -> metaRangeClient.getActiveDataRanges()) + .thenApply(hashStreamRanges -> { + RangeRouter<byte[]> router = new RangeRouter<>(BytesHashRouter.of()); + router.setRanges(hashStreamRanges); + return router; + }) + .whenComplete((router, cause) -> { + if (null == cause) { + ranges.put(streamId, router); + } + outstandingUpdates.remove(streamId, newFetchFuture); + }), + newFetchFuture + ); + } +} diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java similarity index 70% rename from stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java index 2ef970f92f..377f6df2c6 100644 --- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java @@ -17,11 +17,11 @@ * under the License. */ -package org.apache.bookkeeper.clients.impl.kv.interceptors; +package org.apache.bookkeeper.stream.storage.impl.routing; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.MessageLite; @@ -34,6 +34,9 @@ import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.PooledByteBufAllocator; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -41,8 +44,6 @@ import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller; -import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest; @@ -50,25 +51,13 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader; import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc; import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest; +import org.apache.commons.codec.binary.Hex; /** * A client interceptor that intercepting kv rpcs to attach routing information. */ @Slf4j -public class RoutingHeaderClientInterceptor implements ClientInterceptor { - - static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of( - RANGE_ID_KEY, - LongBinaryMarshaller.of() - ); - static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of( - STREAM_ID_KEY, - LongBinaryMarshaller.of() - ); - static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of( - ROUTING_KEY, - IdentityBinaryMarshaller.of() - ); +public class RoutingHeaderProxyInterceptor implements ClientInterceptor { /** * Table request mutator that mutates a table service rpc request to attach @@ -175,53 +164,75 @@ ReqT intercept(ReqT request, CallOptions callOptions, Channel next) { if (log.isTraceEnabled()) { - log.trace("Intercepting method {}", method.getFullMethodName()); + log.trace("Intercepting method {} : req marshaller = {}, resp marshaller = {}", + method.getFullMethodName(), + method.getRequestMarshaller(), + method.getResponseMarshaller()); } InterceptorDescriptor<?> descriptor = kvRpcMethods.get(method.getFullMethodName()); - if (null != descriptor) { - return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { - - private Long rid = null; - private Long sid = null; - private byte[] rk = null; + return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { - @Override - public void start(Listener<RespT> responseListener, Metadata headers) { - // capture routing information from headers - sid = headers.get(SID_METADATA_KEY); - rid = headers.get(RID_METADATA_KEY); - rk = headers.get(RK_METADATA_KEY); - if (log.isTraceEnabled()) { - log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}", - sid, rid, rk); - } + private Long rid = null; + private Long sid = null; + private byte[] rk = null; - delegate().start(responseListener, headers); + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + // capture routing information from headers + sid = headers.get(SID_METADATA_KEY); + rid = headers.get(RID_METADATA_KEY); + rk = headers.get(RK_METADATA_KEY); + if (log.isTraceEnabled()) { + log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}", + sid, rid, rk); } - @Override - public void sendMessage(ReqT message) { - ReqT interceptedMessage; - if (null == rid || null == sid || null == rk) { - // we don't have enough information to form the new routing header - // so do nothing - interceptedMessage = message; - } else { - interceptedMessage = interceptMessage( - method, - descriptor, - message, - sid, - rid, - rk - ); - } - delegate().sendMessage(interceptedMessage); + delegate().start(responseListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + ReqT interceptedMessage; + if (null == rid || null == sid || null == rk || null == descriptor) { + // we don't have enough information to form the new routing header + // we simply copy the bytes and generate a new message to forward + // the request payload + interceptedMessage = interceptMessage(method, message); + } else { + interceptedMessage = interceptMessage( + method, + descriptor, + message, + sid, + rid, + rk + ); } - }; - } else { - return next.newCall(method, callOptions); + delegate().sendMessage(interceptedMessage); + } + }; + } + + private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT> method, ReqT message) { + InputStream is = method.getRequestMarshaller().stream(message); + int bytes; + try { + bytes = is.available(); + } catch (IOException e) { + log.warn("Encountered exceptions in getting available bytes of message", e); + throw new RuntimeException("Encountered exception in intercepting message", e); + } + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(); + try { + buffer.writeBytes(is, bytes); + } catch (IOException e) { + log.warn("Encountered exceptions in transferring bytes to the buffer", e); + buffer.release(); + throw new RuntimeException("Encountered exceptions in transferring bytes to the buffer", e); } + return method + .getRequestMarshaller() + .parse(new ByteBufInputStream(buffer, true)); } private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage( @@ -237,7 +248,9 @@ public void sendMessage(ReqT message) { } else { try { return interceptTableRequest(method, descriptor, message, sid, rid, rk); - } catch (IOException ioe) { + } catch (Throwable t) { + log.error("Failed to intercept table request (sid = {}, rid = {}, rk = {}) : ", + sid, rid, Hex.encodeHexString(rk), t); return message; } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java new file mode 100644 index 0000000000..deb054e150 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import io.grpc.Channel; + +/** + * A manager that manages all the proxy channels to other storage containers. + */ +public interface StorageContainerProxyChannelManager { + + /** + * Get the channel to storage container <tt>scId</tt>. + * + * <p>The method can return `null` if the channel is not ready. + * + * @param scId storage container id + * @return channel to the given storage container, or `null` if it can't connect + */ + Channel getStorageContainerChannel(long scId); + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java new file mode 100644 index 0000000000..ed69e0f040 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import io.grpc.Channel; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel; +import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; + +/** + * Default implementation of {@link StorageContainerProxyChannelManager}. + */ +public class StorageContainerProxyChannelManagerImpl implements StorageContainerProxyChannelManager { + + // we can ideally just talk to the location service directly. + // however currently storage container is separated from actual services to make a clean interface + // so for now proxy channel manager will be acting as a client to talk to its local server to + // get location related information + private final StorageServerClientManager ssClientManager; + + public StorageContainerProxyChannelManagerImpl(StorageServerClientManager clientManager) { + this.ssClientManager = clientManager; + } + + @Override + public Channel getStorageContainerChannel(long scId) { + StorageContainerChannel channel = ssClientManager.getStorageContainerChannel(scId); + // this will trigger creating the channel for the first time + CompletableFuture<StorageServerChannel> serverChannelFuture = channel.getStorageContainerChannelFuture(); + if (null != serverChannelFuture && serverChannelFuture.isDone()) { + StorageServerChannel serverChannel = serverChannelFuture.join(); + if (serverChannel != null) { + return serverChannel.getGrpcChannel(); + } else { + return null; + } + } else { + return null; + } + } + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java new file mode 100644 index 0000000000..5d4437ca36 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Classes related to grpc request routing. + */ +package org.apache.bookkeeper.stream.storage.impl.routing; \ No newline at end of file diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java index 48330b7e13..160d9a8647 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java @@ -58,7 +58,12 @@ public int getNumStorageContainers() { @Override public StorageContainer getStorageContainer(long storageContainerId) { - return containers.getOrDefault(storageContainerId, StorageContainer404.of()); + return getStorageContainer(storageContainerId, StorageContainer404.of()); + } + + @Override + public StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer) { + return containers.getOrDefault(storageContainerId, defaultContainer); } @Override diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java index 3d3a0ad1b5..59d6a1306b 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.mock; import java.net.URI; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.stream.storage.api.StorageContainerStore; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; @@ -47,6 +48,7 @@ public void testBuildNullConfiguration() { .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) .withDefaultBackendUri(uri) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .build(); } @@ -57,6 +59,7 @@ public void testBuildNullResources() { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(null) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -68,6 +71,7 @@ public void testBuildNullRGManagerFactory() { .withStorageContainerManagerFactory(null) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -79,6 +83,7 @@ public void testBuildNullStoreFactory() { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(null) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -90,10 +95,23 @@ public void testBuildNullDefaultBackendUri() { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(null) .build(); } + @Test(expected = NullPointerException.class) + public void testBuildStorageServerClientManager() { + StorageContainerStoreBuilder.newBuilder() + .withStorageConfiguration(mock(StorageConfiguration.class)) + .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) + .withStorageResources(StorageResources.create()) + .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(null) + .withDefaultBackendUri(uri) + .build(); + } + @Test public void testBuild() { StorageContainerStore storageContainerStore = StorageContainerStoreBuilder.newBuilder() @@ -101,6 +119,7 @@ public void testBuild() { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); assertTrue(storageContainerStore instanceof StorageContainerStoreImpl); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java index b9d5e69798..6ae3658dad 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc; import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub; import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.storage.StorageResources; import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService; import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; @@ -133,6 +134,7 @@ private static Endpoint createEndpoint(String hostname, NamespaceConfiguration.newBuilder() .setDefaultStreamConf(DEFAULT_STREAM_CONF) .build(); + private final StorageResources resources = StorageResources.create(); private RangeStoreService mockRangeStoreService; private StorageContainerStoreImpl rangeStore; private Server server; @@ -230,6 +232,7 @@ public void setUp() throws Exception { (storeConf, rgRegistry) -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2), new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory), + null, NullStatsLogger.INSTANCE); rangeStore.start(); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java new file mode 100644 index 0000000000..8201ba714d --- /dev/null +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import io.grpc.stub.StreamObserver; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase; +import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils; +import org.apache.bookkeeper.clients.impl.routing.RangeRouter; +import org.apache.bookkeeper.common.router.BytesHashRouter; +import org.apache.bookkeeper.stream.proto.RangeProperties; +import org.apache.bookkeeper.stream.proto.StreamConfiguration; +import org.apache.bookkeeper.stream.proto.StreamProperties; +import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest; +import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse; +import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest; +import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse; +import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase; +import org.apache.bookkeeper.stream.proto.storage.RelatedRanges; +import org.apache.bookkeeper.stream.proto.storage.RelationType; +import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase; +import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.protocol.util.ProtoUtils; +import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; +import org.junit.Test; + +/** + * Unit test {@link RangeRoutingTable}. + */ +public class RangeRoutingTableImplTest extends GrpcClientTestBase { + + private final long scId = 1234L; + private final long streamId = 123456L; + private GetActiveRangesResponse getActiveRangesResponse; + private CompletableFuture<GetActiveRangesResponse> responseSupplier; + private StreamProperties props; + private List<RangeProperties> rangeProps; + private RangeRoutingTableImpl routingTable; + private RangeRouter<byte[]> rangeRouter; + + @Override + protected void doSetup() throws Exception { + this.props = StreamProperties.newBuilder() + .setStorageContainerId(scId) + .setStreamId(streamId) + .setStreamName("metaclient-stream") + .setStreamConf(StreamConfiguration.newBuilder().build()) + .build(); + this.rangeProps = ProtoUtils.split( + streamId, + 24, + 23456L, + StorageContainerPlacementPolicyImpl.of(4) + ); + final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder = GetActiveRangesResponse.newBuilder(); + for (RangeProperties range : rangeProps) { + RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder() + .setProps(range) + .setType(RelationType.PARENTS) + .addAllRelatedRanges(Collections.emptyList()); + getActiveRangesResponseBuilder.addRanges(rrBuilder); + } + this.getActiveRangesResponse = getActiveRangesResponseBuilder + .setCode(StatusCode.SUCCESS) + .build(); + RootRangeServiceImplBase rootRangeService = new RootRangeServiceImplBase() { + @Override + public void getStream(GetStreamRequest request, + StreamObserver<GetStreamResponse> responseObserver) { + responseObserver.onNext(GetStreamResponse.newBuilder() + .setCode(StatusCode.SUCCESS) + .setStreamProps(props) + .build()); + responseObserver.onCompleted(); + } + }; + serviceRegistry.addService(rootRangeService); + + this.responseSupplier = new CompletableFuture<>(); + // register a good meta range service + MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() { + @Override + public void getActiveRanges(GetActiveRangesRequest request, + StreamObserver<GetActiveRangesResponse> responseObserver) { + try { + responseObserver.onNext(responseSupplier.get()); + responseObserver.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + responseObserver.onError(e); + } catch (ExecutionException e) { + responseObserver.onError(e); + } + } + }; + serviceRegistry.addService(metaRangeService); + + this.routingTable = new RangeRoutingTableImpl(serverManager); + this.rangeRouter = new RangeRouter<>(BytesHashRouter.of()); + this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse)); + } + + @Override + protected void doTeardown() throws Exception { + } + + @Test + public void testGetRange() throws Exception { + String key = "foo"; + byte[] keyBytes = key.getBytes(UTF_8); + RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes); + // the first get will return null since there is nothing in + assertNull(rangeProps); + // the fetch request is outstanding + CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture = + routingTable.getOutstandingFetchRequest(streamId); + assertNotNull(outstandingFetchFuture); + assertFalse(outstandingFetchFuture.isDone()); + + // complete the response supplier, so the fetch request can complete to update the cache + responseSupplier.complete(getActiveRangesResponse); + + // wait until the stuff is cached. + while (null == routingTable.getRangeRouter(streamId)) { + TimeUnit.MILLISECONDS.sleep(100); + } + + // if the router is created, it should return the cached router + rangeProps = routingTable.getRange(streamId, keyBytes); + assertNotNull(rangeProps); + assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps); + } + + @Test + public void testGetRangeException() throws Exception { + String key = "foo"; + byte[] keyBytes = key.getBytes(UTF_8); + RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes); + // the first get will return null since there is nothing in + assertNull(rangeProps); + // the fetch request is outstanding + CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture = + routingTable.getOutstandingFetchRequest(streamId); + assertNotNull(outstandingFetchFuture); + assertFalse(outstandingFetchFuture.isDone()); + + // complete the response supplier, so the fetch request can complete to update the cache + responseSupplier.completeExceptionally(new Exception("fetch failed")); + + // wait until the fetch is done. + try { + outstandingFetchFuture.get(); + fail("Fetch request should fail"); + } catch (Exception e) { + // expected + } + + // once the fetch is done, nothing should be cached and the outstanding fetch request should be removed + assertNull(routingTable.getRangeRouter(streamId)); + assertNull(routingTable.getOutstandingFetchRequest(streamId)); + } + +} diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java similarity index 94% rename from stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java index 745118b049..43c060b12c 100644 --- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java @@ -17,9 +17,12 @@ * under the License. */ -package org.apache.bookkeeper.clients.impl.kv.interceptors; +package org.apache.bookkeeper.stream.storage.impl.routing; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import static org.junit.Assert.assertEquals; import com.google.protobuf.ByteString; @@ -54,10 +57,10 @@ import org.junit.Test; /** - * Unit test {@link RoutingHeaderClientInterceptor}. + * Unit test {@link RoutingHeaderProxyInterceptor}. */ @Slf4j -public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase { +public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase { private final long streamId = 1234L; private final long rangeId = 2345L; @@ -136,11 +139,12 @@ public void put(PutRequest request, StreamObserver<PutResponse> responseObserver }; serviceRegistry.addService(tableService.bindService()); + this.channel = new StorageServerChannel( InProcessChannelBuilder.forName(serverName).directExecutor().build(), Optional.empty() ).intercept( - new RoutingHeaderClientInterceptor(), + new RoutingHeaderProxyInterceptor(), new ClientInterceptor() { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, @@ -152,15 +156,15 @@ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}", streamId, rangeId, new String(routingKey, UTF_8)); headers.put( - RoutingHeaderClientInterceptor.RID_METADATA_KEY, + RID_METADATA_KEY, rangeId ); headers.put( - RoutingHeaderClientInterceptor.SID_METADATA_KEY, + SID_METADATA_KEY, streamId ); headers.put( - RoutingHeaderClientInterceptor.RK_METADATA_KEY, + RK_METADATA_KEY, routingKey ); delegate().start(responseListener, headers); @@ -173,6 +177,7 @@ protected void checkedStart(Listener<RespT> responseListener, Metadata headers) @Override protected void doTeardown() { + channel.close(); } @Test diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java new file mode 100644 index 0000000000..bb94843fe0 --- /dev/null +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import io.grpc.Channel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase; +import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest; +import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse; +import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest; +import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse; +import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint; +import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase; +import org.junit.Test; + +/** + * Unit testing {@link StorageContainerProxyChannelManagerImpl}. + */ +public class StorageContainerProxyChannelManagerImplTest extends GrpcClientTestBase { + + private final long scId = 1234L; + private StorageContainerProxyChannelManagerImpl proxyChannelManager; + + @Override + protected void doSetup() throws Exception { + this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(serverManager); + } + + @Override + protected void doTeardown() throws Exception { + } + + + @Test + public void testGetStorageContainerChannel() throws Exception { + final CompletableFuture<GetStorageContainerEndpointRequest> receivedRequest = new CompletableFuture<>(); + final CompletableFuture<GetStorageContainerEndpointResponse> responseSupplier = new CompletableFuture<>(); + StorageContainerServiceImplBase scService = new StorageContainerServiceImplBase() { + @Override + public void getStorageContainerEndpoint( + GetStorageContainerEndpointRequest request, + StreamObserver<GetStorageContainerEndpointResponse> responseObserver) { + receivedRequest.complete(request); + try { + responseObserver.onNext(responseSupplier.get()); + responseObserver.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + responseObserver.onError(e); + } catch (ExecutionException e) { + responseObserver.onError(e); + } + } + }; + serviceRegistry.addService(scService.bindService()); + + Channel channel = proxyChannelManager.getStorageContainerChannel(scId); + // if the location service doesn't respond, the channel will be null + assertNull(channel); + + // complete the location service request + responseSupplier.complete(getResponse(receivedRequest.get())); + while ((channel = proxyChannelManager.getStorageContainerChannel(scId)) == null) { + TimeUnit.MILLISECONDS.sleep(100); + } + assertNotNull(channel); + } + + private static GetStorageContainerEndpointResponse getResponse(GetStorageContainerEndpointRequest request) { + GetStorageContainerEndpointResponse.Builder respBuilder = + GetStorageContainerEndpointResponse.newBuilder(); + respBuilder.setStatusCode(StatusCode.SUCCESS); + for (OneStorageContainerEndpointRequest oneReq : request.getRequestsList()) { + OneStorageContainerEndpointResponse oneResp = OneStorageContainerEndpointResponse.newBuilder() + .setEndpoint(StorageContainerEndpoint.newBuilder() + .setStorageContainerId(oneReq.getStorageContainer()) + .setRevision(oneReq.getRevision() + 1) + .setRwEndpoint(ENDPOINT)) + .build(); + respBuilder.addResponses(oneResp); + } + return respBuilder.build(); + } + + + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services