This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new e34dd38 [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster` e34dd38 is described below commit e34dd384c865a3494079ec9f38e4a0d87bd0ce51 Author: Sijie Guo <si...@apache.org> AuthorDate: Tue May 22 19:11:16 2018 -0700 [TABLE SERVICE] Move integration tests under `stream/tests/integration` to `tests/integration/cluster` Descriptions of the changes in this PR: The original integration tests were written based a non-dockerized standalone stream cluster. Moved them to use the dockerized integration test framework. So all the integration tests are actually testing the table service run as part of bookies. This change is based on #1422 . a371ff2 is the change in this PR to be reviewed. Master Issue: #1205 Author: Sijie Guo <si...@apache.org> Reviewers: Jia Zhai <None> This closes #1423 from sijie/move_more_stream_it_tests --- .../bookkeeper/clients/StorageClientImpl.java | 1 - .../clients/config/StorageClientSettings.java | 11 ++ .../clients/impl/channel/StorageServerChannel.java | 19 +++- .../impl/channel/StorageServerChannelManager.java | 2 +- .../internal/StorageServerClientManagerImpl.java | 2 +- .../clients/resolver/EndpointResolver.java | 48 +++++++++ stream/pom.xml | 1 - .../util/StorageContainerPlacementPolicy.java | 8 ++ .../bookkeeper/stream/cluster/StreamCluster.java | 3 +- .../bookkeeper/stream/server/StorageServer.java | 25 +++-- .../server/StreamStorageLifecycleComponent.java | 1 - .../api/sc/StorageContainerManagerFactory.java | 4 +- .../stream/storage/RangeStoreBuilder.java | 22 +++- .../stream/storage/impl/RangeStoreImpl.java | 8 +- .../stream/storage/impl/TestRangeStoreImpl.java | 2 +- stream/tests/integration/pom.xml | 111 --------------------- .../tests/integration/StorageClientTest.java | 111 --------------------- .../tests/integration/StorageServerTestBase.java | 80 --------------- .../src/test/resources/log4j.properties | 55 ---------- stream/tests/pom.xml | 33 ------ .../integration/stream/LocationClientTest.java | 6 +- .../stream}/StorageAdminClientTest.java | 37 ++----- .../integration/stream/StreamClusterTestBase.java | 34 +++++++ .../integration/stream}/TableClientSimpleTest.java | 44 ++------ .../tests/integration/stream}/TableClientTest.java | 44 ++------ .../tests/integration/topologies/BKCluster.java | 19 +++- 26 files changed, 208 insertions(+), 523 deletions(-) diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java index aff322c..37c8223 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java @@ -116,7 +116,6 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli serverManager.close(); closeFuture.complete(null); SharedResourceManager.shared().release(resources.scheduler(), scheduler); - scheduler.shutdown(); }); } diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java index 20b5821..87768fa 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java @@ -24,6 +24,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import java.util.List; import java.util.Optional; +import org.apache.bookkeeper.clients.resolver.EndpointResolver; import org.apache.bookkeeper.clients.utils.ClientConstants; import org.apache.bookkeeper.common.util.Backoff; import org.apache.bookkeeper.stream.proto.common.Endpoint; @@ -57,6 +58,15 @@ public interface StorageClientSettings { List<Endpoint> endpoints(); /** + * Return the endpoint resolver for resolving individual endpoints. + * + * <p>The default resolver is an identity resolver. + * + * @return the endpoint resolver for resolving endpoints. + */ + EndpointResolver endpointResolver(); + + /** * Returns the builder to create the managed channel. * * @return @@ -99,6 +109,7 @@ public interface StorageClientSettings { numWorkerThreads(Runtime.getRuntime().availableProcessors()); usePlaintext(true); backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY); + endpointResolver(EndpointResolver.identity()); } @Override diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java index e8e72db..34dc957 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java @@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder; import java.util.Optional; import java.util.function.Function; import javax.annotation.concurrent.GuardedBy; +import org.apache.bookkeeper.clients.config.StorageClientSettings; +import org.apache.bookkeeper.clients.resolver.EndpointResolver; import org.apache.bookkeeper.clients.utils.GrpcUtils; import org.apache.bookkeeper.stream.proto.common.Endpoint; import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc; @@ -42,8 +44,12 @@ import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceF */ public class StorageServerChannel implements AutoCloseable { - public static Function<Endpoint, StorageServerChannel> factory(boolean usePlaintext) { - return (endpoint) -> new StorageServerChannel(endpoint, Optional.empty(), usePlaintext); + public static Function<Endpoint, StorageServerChannel> factory(StorageClientSettings settings) { + return (endpoint) -> new StorageServerChannel( + endpoint, + Optional.empty(), + settings.usePlaintext(), + settings.endpointResolver()); } private final Optional<String> token; @@ -63,14 +69,17 @@ public class StorageServerChannel implements AutoCloseable { * * @param endpoint range server endpoint. * @param token token used to access range server + * @param usePlainText whether to plain text protocol or not */ public StorageServerChannel(Endpoint endpoint, Optional<String> token, - boolean usePlainText) { + boolean usePlainText, + EndpointResolver endpointResolver) { this.token = token; + Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint); this.channel = ManagedChannelBuilder.forAddress( - endpoint.getHostname(), - endpoint.getPort()) + resolvedEndpoint.getHostname(), + resolvedEndpoint.getPort()) .usePlaintext(usePlainText) .build(); } diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java index 67b3d0f..308d25d 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java @@ -40,7 +40,7 @@ public class StorageServerChannelManager implements AutoCloseable { private final Function<Endpoint, StorageServerChannel> channelFactory; public StorageServerChannelManager(StorageClientSettings settings) { - this(StorageServerChannel.factory(settings.usePlaintext())); + this(StorageServerChannel.factory(settings)); } @VisibleForTesting diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java index d219b6a..3ae3b6b 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java @@ -65,7 +65,7 @@ public class StorageServerClientManagerImpl this( settings, schedulerResource, - StorageServerChannel.factory(settings.usePlaintext())); + StorageServerChannel.factory(settings)); } public StorageServerClientManagerImpl(StorageClientSettings settings, diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java new file mode 100644 index 0000000..e5002b1 --- /dev/null +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.clients.resolver; + +import org.apache.bookkeeper.stream.proto.common.Endpoint; + +/** + * Resolve an endpoint to another endpoint. + * + * <p>The resolver can be used for resolving the right ip address for an advertised endpoint. It is typically useful + * in dockerized integration tests, where the test clients are typically outside of the docker network. + */ +public interface EndpointResolver { + + /** + * Returns a resolver that always returns its input endpoint. + * + * @return a function that always returns its input endpoint + */ + static EndpointResolver identity() { + return endpoint -> endpoint; + } + + /** + * Resolve <tt>endpoint</tt> to another endpoint. + * + * @param endpoint endpoint to resolve + * @return the resolved endpoint. + */ + Endpoint resolve(Endpoint endpoint); + +} diff --git a/stream/pom.xml b/stream/pom.xml index 208e6d0..f110d52 100644 --- a/stream/pom.xml +++ b/stream/pom.xml @@ -38,7 +38,6 @@ <module>storage</module> <module>server</module> <module>cli</module> - <module>tests</module> </modules> <build> diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java index 8c886ea..340a731 100644 --- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java +++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java @@ -31,8 +31,16 @@ package org.apache.bookkeeper.stream.protocol.util; /** * Placement policy to place ranges to group. */ +@FunctionalInterface public interface StorageContainerPlacementPolicy { + @FunctionalInterface + interface Factory { + + StorageContainerPlacementPolicy newPlacementPolicy(); + + } + long placeStreamRange(long streamId, long rangeId); } diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java index 11bc8af..ff0cda2 100644 --- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java +++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java @@ -187,8 +187,7 @@ public class StreamCluster bookiePort, grpcPort, bkDir, rangesStoreDir, spec.serveReadOnlyTable); server = StorageServer.buildStorageServer( serverConf, - grpcPort, - spec.numServers() * 2); + grpcPort); server.start(); log.info("Started storage server at (bookie port = {}, grpc port = {})", bookiePort, grpcPort); diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java index e0b87ef..94c8587 100644 --- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java +++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java @@ -55,6 +55,7 @@ import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController; +import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager; import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl; import org.apache.commons.configuration.CompositeConfiguration; @@ -143,8 +144,7 @@ public class StorageServer { try { storageServer = buildStorageServer( conf, - grpcPort, - 1024); + grpcPort); } catch (ConfigurationException e) { log.error("Invalid storage configuration", e); return ExitCode.INVALID_CONF.code(); @@ -168,15 +168,13 @@ public class StorageServer { } public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, - int grpcPort, - int numStorageContainers) + int grpcPort) throws UnknownHostException, ConfigurationException { - return buildStorageServer(conf, grpcPort, numStorageContainers, true, NullStatsLogger.INSTANCE); + return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE); } public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, int grpcPort, - int numStorageContainers, boolean startBookieAndStartProvider, StatsLogger externalStatsLogger) throws ConfigurationException, UnknownHostException { @@ -250,12 +248,21 @@ public class StorageServer { .withStorageConfiguration(storageConf) // the storage resources shared across multiple components .withStorageResources(storageResources) - // the number of storage containers - .withNumStorageContainers(numStorageContainers) + // the placement policy + .withStorageContainerPlacementPolicyFactory(() -> { + long numStorageContainers; + try (ZkClusterMetadataStore store = new ZkClusterMetadataStore( + curatorProviderService.get(), + ZKMetadataDriverBase.resolveZkServers(bkServerConf), + ZK_METADATA_ROOT_PATH)) { + numStorageContainers = store.getClusterMetadata().getNumStorageContainers(); + } + return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers); + }) // the default log backend uri .withDefaultBackendUri(dlNamespaceProvider.getDlogUri()) // with zk-based storage container manager - .withStorageContainerManagerFactory((ignored, storeConf, registry) -> + .withStorageContainerManagerFactory((storeConf, registry) -> new ZkStorageContainerManager( myEndpoint, storageConf, diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java index 5e6b0b5..6cf73b0 100644 --- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java +++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java @@ -42,7 +42,6 @@ public class StreamStorageLifecycleComponent extends ServerLifecycleComponent { this.streamStorage = StorageServer.buildStorageServer( conf.getUnderlyingConf(), ssConf.getGrpcPort(), - 1024, /* indicator */ false, statsLogger.scope("stream")); } diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java index daa130b..b89a465 100644 --- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java +++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java @@ -24,13 +24,11 @@ public interface StorageContainerManagerFactory { /** * Create a storage container manager to manage lifecycles of {@link StorageContainer}. * - * @param numStorageContainers num of storage containers. * @param conf storage configuration * @param registry storage container registry * @return storage container manager. */ - StorageContainerManager create(int numStorageContainers, - StorageConfiguration conf, + StorageContainerManager create(StorageConfiguration conf, StorageContainerRegistry registry); } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java index 1bd5510..c03b374 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java @@ -19,10 +19,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.net.URI; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy; import org.apache.bookkeeper.stream.storage.api.RangeStore; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl; +import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory; /** @@ -38,8 +40,9 @@ public final class RangeStoreBuilder { private StorageConfiguration storeConf = null; private StorageResources storeResources = null; private StorageContainerManagerFactory scmFactory = null; + private StorageContainerPlacementPolicy.Factory placementPolicyFactory = () -> + StorageContainerPlacementPolicyImpl.of(1024); private MVCCStoreFactory mvccStoreFactory = null; - private int numStorageContainers = 1024; private URI defaultBackendUri = null; private RangeStoreBuilder() { @@ -52,7 +55,19 @@ public final class RangeStoreBuilder { * @return range store builder */ public RangeStoreBuilder withNumStorageContainers(int numStorageContainers) { - this.numStorageContainers = numStorageContainers; + this.placementPolicyFactory = () -> StorageContainerPlacementPolicyImpl.of(numStorageContainers); + return this; + } + + /** + * Build the range store with the provided <tt>placementPolicyFactory</tt>. + * + * @param placementPolicyFactory placement policy factor to create placement policies. + * @return range store builder. + */ + public RangeStoreBuilder withStorageContainerPlacementPolicyFactory( + StorageContainerPlacementPolicy.Factory placementPolicyFactory) { + this.placementPolicyFactory = placementPolicyFactory; return this; } @@ -130,6 +145,7 @@ public final class RangeStoreBuilder { checkNotNull(storeConf, "StorageConfiguration is not provided"); checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided"); checkNotNull(defaultBackendUri, "Default backend uri is not provided"); + checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided"); return new RangeStoreImpl( storeConf, @@ -137,7 +153,7 @@ public final class RangeStoreBuilder { scmFactory, mvccStoreFactory, defaultBackendUri, - numStorageContainers, + placementPolicyFactory, statsLogger); } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java index 42fe40b..f175a55 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java @@ -48,7 +48,6 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory; -import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl; import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory; @@ -71,15 +70,14 @@ public class RangeStoreImpl StorageContainerManagerFactory factory, MVCCStoreFactory mvccStoreFactory, URI defaultBackendUri, - int numStorageContainers, + StorageContainerPlacementPolicy.Factory placementPolicyFactory, StatsLogger statsLogger) { super("range-service", conf, statsLogger); this.schedulerResource = schedulerResource; this.scheduler = SharedResourceManager.shared().get(schedulerResource); this.scmFactory = factory; - StorageContainerPlacementPolicy placementPolicy = - StorageContainerPlacementPolicyImpl.of(numStorageContainers); this.storeFactory = mvccStoreFactory; + StorageContainerPlacementPolicy placementPolicy = placementPolicyFactory.newPlacementPolicy(); this.scRegistry = new StorageContainerRegistryImpl( new DefaultStorageContainerFactory( conf, @@ -88,7 +86,7 @@ public class RangeStoreImpl storeFactory, defaultBackendUri), scheduler); - this.scManager = scmFactory.create(numStorageContainers, conf, scRegistry); + this.scManager = scmFactory.create(conf, scRegistry); } @Override diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java index 1befe82..ac4bb78 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java @@ -211,7 +211,7 @@ public class TestRangeStoreImpl { rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder() .withStorageConfiguration(storageConf) .withStorageResources(storageResources) - .withStorageContainerManagerFactory((numScs, storeConf, rgRegistry) + .withStorageContainerManagerFactory((storeConf, rgRegistry) -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2)) .withRangeStoreFactory(storeFactory) .withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage")) diff --git a/stream/tests/integration/pom.xml b/stream/tests/integration/pom.xml deleted file mode 100644 index 77e697a..0000000 --- a/stream/tests/integration/pom.xml +++ /dev/null @@ -1,111 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.bookkeeper.tests</groupId> - <artifactId>stream-storage-tests-parent</artifactId> - <version>4.8.0-SNAPSHOT</version> - </parent> - <artifactId>stream-storage-integration-test</artifactId> - <name>Apache BookKeeper :: Stream Storage :: Tests :: Integration</name> - - <dependencies> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>stream-storage-java-client</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>stream-storage-server</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>org.apache.distributedlog</groupId> - <artifactId>distributedlog-core</artifactId> - <version>${project.parent.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>${maven-jar-plugin.version}</version> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${maven-surefire-plugin.version}</version> - <configuration> - <!-- only run tests when -DstreamIntegrationTests is specified //--> - <skipTests>true</skipTests> - <redirectTestOutputToFile>true</redirectTestOutputToFile> - <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine> - <forkMode>always</forkMode> - <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <id>streamIntegrationTests</id> - <activation> - <property> - <name>streamIntegrationTests</name> - </property> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skipTests>false</skipTests> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <version>${maven-deploy-plugin.version}</version> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> -</project> diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java deleted file mode 100644 index ebe99f0..0000000 --- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bookkeeper.stream.tests.integration; - -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_RETENTION_POLICY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SEGMENT_ROLLING_POLICY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY; -import static org.junit.Assert.assertEquals; - -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.api.StorageClient; -import org.apache.bookkeeper.clients.StorageClientBuilder; -import org.apache.bookkeeper.clients.admin.StorageAdminClient; -import org.apache.bookkeeper.clients.config.StorageClientSettings; -import org.apache.bookkeeper.common.concurrent.FutureUtils; -import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; -import org.apache.bookkeeper.stream.proto.RangeKeyType; -import org.apache.bookkeeper.stream.proto.StreamConfiguration; -import org.apache.bookkeeper.stream.proto.StreamProperties; -import org.apache.bookkeeper.stream.proto.common.Endpoint; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Integration test for stream client test. - */ -@Slf4j -public class StorageClientTest extends StorageServerTestBase { - - @Rule - public final TestName testName = new TestName(); - - private String nsName; - private String streamName; - private StorageAdminClient adminClient; - private StorageClient client; - private final StreamConfiguration streamConf = StreamConfiguration.newBuilder() - .setKeyType(RangeKeyType.HASH) - .setInitialNumRanges(4) - .setMinNumRanges(4) - .setRetentionPolicy(DEFAULT_RETENTION_POLICY) - .setRollingPolicy(DEFAULT_SEGMENT_ROLLING_POLICY) - .setSplitPolicy(DEFAULT_SPLIT_POLICY) - .build(); - private final NamespaceConfiguration colConf = NamespaceConfiguration.newBuilder() - .setDefaultStreamConf(streamConf) - .build(); - - @Override - protected void doSetup() throws Exception { - StorageClientSettings settings = StorageClientSettings.newBuilder() - .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()])) - .usePlaintext(true) - .build(); - adminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin(); - nsName = "test_namespace"; - FutureUtils.result( - adminClient.createNamespace(nsName, colConf)); - client = StorageClientBuilder.newBuilder() - .withSettings(settings) - .withNamespace(nsName) - .build(); - streamName = "test_stream"; - createStream(streamName); - } - - @Override - protected void doTeardown() throws Exception { - if (null != client) { - client.closeAsync(); - } - if (null != adminClient) { - adminClient.closeAsync(); - } - } - - private void createStream(String streamName) throws Exception { - FutureUtils.result( - adminClient.createStream( - nsName, - streamName, - streamConf)); - } - - @Test - public void testAdmin() throws Exception { - StreamProperties properties = - FutureUtils.result(adminClient.getStream(nsName, streamName)); - assertEquals( - StreamConfiguration.newBuilder(streamConf) - .setBackendServiceUrl(cluster.getDefaultBackendUri().toString()) - .build() - , properties.getStreamConf()); - } - -} diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java deleted file mode 100644 index 21dc004..0000000 --- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.bookkeeper.stream.tests.integration; - -import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.stream.cluster.StreamCluster; -import org.apache.bookkeeper.stream.cluster.StreamClusterSpec; -import org.apache.commons.configuration.CompositeConfiguration; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - - -/** - * Test Base for Range Server related tests. - */ -@Slf4j -public abstract class StorageServerTestBase { - - static { - // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words - // are disabled by default due to security reasons - System.setProperty("zookeeper.4lw.commands.whitelist", "*"); - } - - @Rule - public final TemporaryFolder testDir = new TemporaryFolder(); - - protected StreamClusterSpec spec; - protected StreamCluster cluster; - - protected StorageServerTestBase() { - this(StreamClusterSpec.builder() - .baseConf(new CompositeConfiguration()) - .numServers(3) - .build()); - } - - protected StorageServerTestBase(StreamClusterSpec spec) { - this.spec = spec; - } - - @Before - public void setUp() throws Exception { - spec = spec.storageRootDir(testDir.newFolder("tests")); - this.cluster = StreamCluster.build(spec); - this.cluster.start(); - doSetup(); - } - - protected abstract void doSetup() throws Exception; - - @After - public void tearDown() throws Exception { - doTeardown(); - if (null != this.cluster) { - this.cluster.stop(); - } - } - - protected abstract void doTeardown() throws Exception; - -} diff --git a/stream/tests/integration/src/test/resources/log4j.properties b/stream/tests/integration/src/test/resources/log4j.properties deleted file mode 100644 index 614fe75..0000000 --- a/stream/tests/integration/src/test/resources/log4j.properties +++ /dev/null @@ -1,55 +0,0 @@ -#/** -# * Copyright 2007 The Apache Software Foundation -# * -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -# -# DisributedLog Logging Configuration -# - -# Example with rolling log file -log4j.rootLogger=INFO, CONSOLE - -#disable zookeeper logging -log4j.logger.org.apache.zookeeper=OFF -#Set the bookkeeper level to warning -log4j.logger.org.apache.bookkeeper=INFO -#disable helix logging -log4j.logger.org.apache.helix=OFF - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=DEBUG -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -# Add ROLLINGFILE to rootLogger to get log file output -# Log DEBUG level and above messages to a log file -#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender -#log4j.appender.ROLLINGFILE.Threshold=INFO -#log4j.appender.ROLLINGFILE.File=stream.log -#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout -#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm -#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -log4j.appender.R=org.apache.log4j.RollingFileAppender -log4j.appender.R.Threshold=TRACE -log4j.appender.R.File=target/error.log -log4j.appender.R.MaxFileSize=200MB -log4j.appender.R.MaxBackupIndex=7 -log4j.appender.R.layout=org.apache.log4j.PatternLayout -log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n diff --git a/stream/tests/pom.xml b/stream/tests/pom.xml deleted file mode 100644 index 0f6029b..0000000 --- a/stream/tests/pom.xml +++ /dev/null @@ -1,33 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <packaging>pom</packaging> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.bookkeeper</groupId> - <artifactId>stream-storage-parent</artifactId> - <version>4.8.0-SNAPSHOT</version> - </parent> - <groupId>org.apache.bookkeeper.tests</groupId> - <artifactId>stream-storage-tests-parent</artifactId> - <name>Apache BookKeeper :: Stream Storage :: Tests</name> - <modules> - <module>integration</module> - </modules> -</project> diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java index 42b428e..92e39c3 100644 --- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java @@ -52,12 +52,8 @@ public class LocationClientTest extends StreamClusterTestBase { .name("location-client-test") .numThreads(1) .build(); - StorageClientSettings settings = StorageClientSettings.newBuilder() - .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()])) - .usePlaintext(true) - .build(); client = new LocationClientImpl( - settings, + newStorageClientSettings(), scheduler); } diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java similarity index 86% rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java index 03edfb8..569cf7c 100644 --- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.bookkeeper.stream.tests.integration; +package org.apache.bookkeeper.tests.integration.stream; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; import static org.junit.Assert.assertEquals; @@ -39,6 +39,8 @@ import org.apache.bookkeeper.stream.proto.StreamConfiguration; import org.apache.bookkeeper.stream.proto.StreamProperties; import org.apache.bookkeeper.stream.proto.common.Endpoint; import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -46,37 +48,23 @@ import org.junit.rules.TestName; /** * Integration test for stream admin client test. */ -public class StorageAdminClientTest extends StorageServerTestBase { +public class StorageAdminClientTest extends StreamClusterTestBase { @Rule public final TestName testName = new TestName(); - private OrderedScheduler scheduler; private StorageAdminClient adminClient; - @Override - protected void doSetup() throws Exception { - scheduler = OrderedScheduler.newSchedulerBuilder() - .name("admin-client-test") - .numThreads(1) - .build(); - StorageClientSettings settings = StorageClientSettings.newBuilder() - .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()])) - .usePlaintext(true) - .build(); - adminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin(); + @Before + public void setup() { + adminClient = createStorageAdminClient(newStorageClientSettings()); } - @Override - protected void doTeardown() throws Exception { + @After + public void teardown() { if (null != adminClient) { adminClient.close(); } - if (null != scheduler) { - scheduler.shutdown(); - } } @Test @@ -152,11 +140,6 @@ public class StorageAdminClientTest extends StorageServerTestBase { .build(); StreamProperties streamProps = FutureUtils.result(adminClient.createStream(nsName, streamName, streamConf)); assertEquals(streamName, streamProps.getStreamName()); - assertEquals( - StreamConfiguration.newBuilder(streamConf) - .setBackendServiceUrl(cluster.getDefaultBackendUri().toString()) - .build(), - streamProps.getStreamConf()); // create a duplicated stream try { diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java index b86cc72..fe7c914 100644 --- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java @@ -21,6 +21,10 @@ package org.apache.bookkeeper.tests.integration.stream; import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.api.StorageClient; +import org.apache.bookkeeper.clients.StorageClientBuilder; +import org.apache.bookkeeper.clients.admin.StorageAdminClient; +import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.utils.NetUtils; import org.apache.bookkeeper.stream.proto.common.Endpoint; import org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase; @@ -69,5 +73,35 @@ public abstract class StreamClusterTestBase extends BookKeeperClusterTestBase { .collect(Collectors.toList()); } + // + // Test Util Methods + // + + protected static StorageClientSettings newStorageClientSettings() { + return StorageClientSettings.newBuilder() + .addEndpoints(getExsternalStreamEndpoints().toArray(new Endpoint[getNumBookies()])) + .endpointResolver(endpoint -> { + String internalEndpointStr = NetUtils.endpointToString(endpoint); + String externalEndpointStr = + bkCluster.resolveExternalGrpcEndpointStr(internalEndpointStr); + log.info("Resolve endpoint {} to {}", internalEndpointStr, externalEndpointStr); + return NetUtils.parseEndpoint(externalEndpointStr); + }) + .usePlaintext(true) + .build(); + } + + protected static StorageAdminClient createStorageAdminClient(StorageClientSettings settings) { + return StorageClientBuilder.newBuilder() + .withSettings(settings) + .buildAdmin(); + } + + protected static StorageClient createStorageClient(StorageClientSettings settings, String namespace) { + return StorageClientBuilder.newBuilder() + .withSettings(settings) + .withNamespace(namespace) + .build(); + } } diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java similarity index 85% rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java index 7add6d7..d1ff091 100644 --- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.bookkeeper.stream.tests.integration; +package org.apache.bookkeeper.tests.integration.stream; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; @@ -38,15 +38,14 @@ import org.apache.bookkeeper.api.kv.PTable; import org.apache.bookkeeper.api.kv.exceptions.KvApiException; import org.apache.bookkeeper.api.kv.result.Code; import org.apache.bookkeeper.api.kv.result.KeyValue; -import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceProperties; import org.apache.bookkeeper.stream.proto.StreamConfiguration; import org.apache.bookkeeper.stream.proto.StreamProperties; -import org.apache.bookkeeper.stream.proto.common.Endpoint; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -55,47 +54,31 @@ import org.junit.rules.TestName; * Integration test for table service. */ @Slf4j -public class TableClientSimpleTest extends StorageServerTestBase { +public class TableClientSimpleTest extends StreamClusterTestBase { @Rule public final TestName testName = new TestName(); private final String namespace = "test_namespace"; - private OrderedScheduler scheduler; private StorageAdminClient adminClient; private StorageClient storageClient; - @Override - protected void doSetup() throws Exception { - scheduler = OrderedScheduler.newSchedulerBuilder() - .name("table-client-test") - .numThreads(1) - .build(); - StorageClientSettings settings = StorageClientSettings.newBuilder() - .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()])) - .usePlaintext(true) - .build(); + @Before + public void setup() { + StorageClientSettings settings = newStorageClientSettings(); String namespace = "test_namespace"; - adminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin(); - storageClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .withNamespace(namespace) - .build(); + adminClient = createStorageAdminClient(settings); + storageClient = createStorageClient(settings, namespace); } - @Override - protected void doTeardown() throws Exception { + @After + public void teardown() { if (null != adminClient) { adminClient.close(); } if (null != storageClient) { storageClient.close(); } - if (null != scheduler) { - scheduler.shutdown(); - } } private static ByteBuf getLKey(int i) { @@ -123,11 +106,6 @@ public class TableClientSimpleTest extends StorageServerTestBase { StreamProperties streamProps = result( adminClient.createStream(namespace, streamName, streamConf)); assertEquals(streamName, streamProps.getStreamName()); - assertEquals( - StreamConfiguration.newBuilder(streamConf) - .setBackendServiceUrl(cluster.getDefaultBackendUri().toString()) - .build(), - streamProps.getStreamConf()); // Open the table PTable<ByteBuf, ByteBuf> table = result(storageClient.openPTable(streamName)); diff --git a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java similarity index 89% rename from stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java rename to tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java index 7db265b..0dff7fa 100644 --- a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.bookkeeper.stream.tests.integration; +package org.apache.bookkeeper.tests.integration.stream; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; @@ -47,16 +47,15 @@ import org.apache.bookkeeper.api.kv.result.PutResult; import org.apache.bookkeeper.api.kv.result.RangeResult; import org.apache.bookkeeper.api.kv.result.Result; import org.apache.bookkeeper.api.kv.result.TxnResult; -import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.common.concurrent.FutureUtils; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceProperties; import org.apache.bookkeeper.stream.proto.StreamConfiguration; import org.apache.bookkeeper.stream.proto.StreamProperties; -import org.apache.bookkeeper.stream.proto.common.Endpoint; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -65,48 +64,32 @@ import org.junit.rules.TestName; * Integration test for table service. */ @Slf4j -public class TableClientTest extends StorageServerTestBase { +public class TableClientTest extends StreamClusterTestBase { @Rule public final TestName testName = new TestName(); private final String namespace = "test_namespace"; - private OrderedScheduler scheduler; private StorageAdminClient adminClient; private StorageClient storageClient; private final OptionFactory<ByteBuf> optionFactory = new OptionFactoryImpl<>(); - @Override - protected void doSetup() throws Exception { - scheduler = OrderedScheduler.newSchedulerBuilder() - .name("table-client-test") - .numThreads(1) - .build(); - StorageClientSettings settings = StorageClientSettings.newBuilder() - .addEndpoints(cluster.getRpcEndpoints().toArray(new Endpoint[cluster.getRpcEndpoints().size()])) - .usePlaintext(true) - .build(); + @Before + public void setup() { + StorageClientSettings settings = newStorageClientSettings(); String namespace = "test_namespace"; - adminClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .buildAdmin(); - storageClient = StorageClientBuilder.newBuilder() - .withSettings(settings) - .withNamespace(namespace) - .build(); + adminClient = createStorageAdminClient(settings); + storageClient = createStorageClient(settings, namespace); } - @Override - protected void doTeardown() throws Exception { + @After + public void teardown() { if (null != adminClient) { adminClient.close(); } if (null != storageClient) { storageClient.close(); } - if (null != scheduler) { - scheduler.shutdown(); - } } private static ByteBuf getLKey(int i) { @@ -134,11 +117,6 @@ public class TableClientTest extends StorageServerTestBase { StreamProperties streamProps = FutureUtils.result( adminClient.createStream(namespace, streamName, streamConf)); assertEquals(streamName, streamProps.getStreamName()); - assertEquals( - StreamConfiguration.newBuilder(streamConf) - .setBackendServiceUrl(cluster.getDefaultBackendUri().toString()) - .build(), - streamProps.getStreamConf()); // Open the table PTable<ByteBuf, ByteBuf> table = FutureUtils.result(storageClient.openPTable(streamName)); diff --git a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java index 80ae906..9a7cd86 100644 --- a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java +++ b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java @@ -18,6 +18,8 @@ package org.apache.bookkeeper.tests.integration.topologies; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -60,6 +62,7 @@ public class BKCluster { private final Network network; private final MetadataStoreContainer metadataContainer; private final Map<String, BookieContainer> bookieContainers; + private final Map<String, String> internalEndpointsToExternalEndpoints; private final int numBookies; private final String extraServerComponents; private volatile boolean enableContainerLog; @@ -72,6 +75,7 @@ public class BKCluster { .withNetwork(network) .withNetworkAliases(ZKContainer.HOST_NAME); this.bookieContainers = Maps.newTreeMap(); + this.internalEndpointsToExternalEndpoints = Maps.newConcurrentMap(); this.numBookies = spec.numBookies(); this.extraServerComponents = spec.extraServerComponents(); this.enableContainerLog = spec.enableContainerLog(); @@ -114,6 +118,13 @@ public class BKCluster { createBookies("bookie", numBookies); } + public String resolveExternalGrpcEndpointStr(String internalGrpcEndpointStr) { + String externalGrpcEndpointStr = internalEndpointsToExternalEndpoints.get(internalGrpcEndpointStr); + checkNotNull(externalGrpcEndpointStr, + "No internal grpc endpoint is found : " + internalGrpcEndpointStr); + return externalGrpcEndpointStr; + } + public void stop() { synchronized (this) { bookieContainers.values().forEach(BookieContainer::stop); @@ -155,6 +166,7 @@ public class BKCluster { synchronized (this) { container = bookieContainers.remove(bookieName); if (null != container) { + internalEndpointsToExternalEndpoints.remove(container.getInternalGrpcEndpointStr()); container.stop(); } } @@ -175,8 +187,6 @@ public class BKCluster { if (enableContainerLog) { container.tailContainerLog(); } - - log.info("Created bookie {}", bookieName); bookieContainers.put(bookieName, container); } } @@ -184,6 +194,11 @@ public class BKCluster { if (shouldStart) { log.info("Starting bookie {}", bookieName); container.start(); + log.info("Started bookie {} : internal endpoint = {}, external endpoint = {}", + bookieName, container.getInternalGrpcEndpointStr(), container.getExternalGrpcEndpointStr()); + internalEndpointsToExternalEndpoints.put( + container.getInternalGrpcEndpointStr(), + container.getExternalGrpcEndpointStr()); } return container; } -- To stop receiving notification emails like this one, please contact si...@apache.org.