This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e34dd38  [TABLE SERVICE] Move integration tests under 
`stream/tests/integration` to `tests/integration/cluster`
e34dd38 is described below

commit e34dd384c865a3494079ec9f38e4a0d87bd0ce51
Author: Sijie Guo <si...@apache.org>
AuthorDate: Tue May 22 19:11:16 2018 -0700

    [TABLE SERVICE] Move integration tests under `stream/tests/integration` to 
`tests/integration/cluster`
    
    Descriptions of the changes in this PR:
    
    The original integration tests were written based a non-dockerized 
standalone stream cluster. Moved them to use
    the dockerized integration test framework. So all the integration tests are 
actually testing the table service run as part of bookies.
    
    This change is based on #1422 .
    a371ff2 is the change in this PR to be reviewed.
    
    Master Issue: #1205
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1423 from sijie/move_more_stream_it_tests
---
 .../bookkeeper/clients/StorageClientImpl.java      |   1 -
 .../clients/config/StorageClientSettings.java      |  11 ++
 .../clients/impl/channel/StorageServerChannel.java |  19 +++-
 .../impl/channel/StorageServerChannelManager.java  |   2 +-
 .../internal/StorageServerClientManagerImpl.java   |   2 +-
 .../clients/resolver/EndpointResolver.java         |  48 +++++++++
 stream/pom.xml                                     |   1 -
 .../util/StorageContainerPlacementPolicy.java      |   8 ++
 .../bookkeeper/stream/cluster/StreamCluster.java   |   3 +-
 .../bookkeeper/stream/server/StorageServer.java    |  25 +++--
 .../server/StreamStorageLifecycleComponent.java    |   1 -
 .../api/sc/StorageContainerManagerFactory.java     |   4 +-
 .../stream/storage/RangeStoreBuilder.java          |  22 +++-
 .../stream/storage/impl/RangeStoreImpl.java        |   8 +-
 .../stream/storage/impl/TestRangeStoreImpl.java    |   2 +-
 stream/tests/integration/pom.xml                   | 111 ---------------------
 .../tests/integration/StorageClientTest.java       | 111 ---------------------
 .../tests/integration/StorageServerTestBase.java   |  80 ---------------
 .../src/test/resources/log4j.properties            |  55 ----------
 stream/tests/pom.xml                               |  33 ------
 .../integration/stream/LocationClientTest.java     |   6 +-
 .../stream}/StorageAdminClientTest.java            |  37 ++-----
 .../integration/stream/StreamClusterTestBase.java  |  34 +++++++
 .../integration/stream}/TableClientSimpleTest.java |  44 ++------
 .../tests/integration/stream}/TableClientTest.java |  44 ++------
 .../tests/integration/topologies/BKCluster.java    |  19 +++-
 26 files changed, 208 insertions(+), 523 deletions(-)

diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index aff322c..37c8223 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -116,7 +116,6 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable 
implements StorageCli
             serverManager.close();
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), 
scheduler);
-            scheduler.shutdown();
         });
     }
 
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 20b5821..87768fa 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,7 @@ import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
 import java.util.List;
 import java.util.Optional;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.ClientConstants;
 import org.apache.bookkeeper.common.util.Backoff;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
@@ -57,6 +58,15 @@ public interface StorageClientSettings {
     List<Endpoint> endpoints();
 
     /**
+     * Return the endpoint resolver for resolving individual endpoints.
+     *
+     * <p>The default resolver is an identity resolver.
+     *
+     * @return the endpoint resolver for resolving endpoints.
+     */
+    EndpointResolver endpointResolver();
+
+    /**
      * Returns the builder to create the managed channel.
      *
      * @return
@@ -99,6 +109,7 @@ public interface StorageClientSettings {
             numWorkerThreads(Runtime.getRuntime().availableProcessors());
             usePlaintext(true);
             backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+            endpointResolver(EndpointResolver.identity());
         }
 
         @Override
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index e8e72db..34dc957 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
 import java.util.Optional;
 import java.util.function.Function;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.resolver.EndpointResolver;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
@@ -42,8 +44,12 @@ import 
org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceF
  */
 public class StorageServerChannel implements AutoCloseable {
 
-    public static Function<Endpoint, StorageServerChannel> factory(boolean 
usePlaintext) {
-        return (endpoint) -> new StorageServerChannel(endpoint, 
Optional.empty(), usePlaintext);
+    public static Function<Endpoint, StorageServerChannel> 
factory(StorageClientSettings settings) {
+        return (endpoint) -> new StorageServerChannel(
+            endpoint,
+            Optional.empty(),
+            settings.usePlaintext(),
+            settings.endpointResolver());
     }
 
     private final Optional<String> token;
@@ -63,14 +69,17 @@ public class StorageServerChannel implements AutoCloseable {
      *
      * @param endpoint range server endpoint.
      * @param token    token used to access range server
+     * @param usePlainText whether to plain text protocol or not
      */
     public StorageServerChannel(Endpoint endpoint,
                                 Optional<String> token,
-                                boolean usePlainText) {
+                                boolean usePlainText,
+                                EndpointResolver endpointResolver) {
         this.token = token;
+        Endpoint resolvedEndpoint = endpointResolver.resolve(endpoint);
         this.channel = ManagedChannelBuilder.forAddress(
-            endpoint.getHostname(),
-            endpoint.getPort())
+            resolvedEndpoint.getHostname(),
+            resolvedEndpoint.getPort())
             .usePlaintext(usePlainText)
             .build();
     }
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
index 67b3d0f..308d25d 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannelManager.java
@@ -40,7 +40,7 @@ public class StorageServerChannelManager implements 
AutoCloseable {
     private final Function<Endpoint, StorageServerChannel> channelFactory;
 
     public StorageServerChannelManager(StorageClientSettings settings) {
-        this(StorageServerChannel.factory(settings.usePlaintext()));
+        this(StorageServerChannel.factory(settings));
     }
 
     @VisibleForTesting
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index d219b6a..3ae3b6b 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -65,7 +65,7 @@ public class StorageServerClientManagerImpl
         this(
             settings,
             schedulerResource,
-            StorageServerChannel.factory(settings.usePlaintext()));
+            StorageServerChannel.factory(settings));
     }
 
     public StorageServerClientManagerImpl(StorageClientSettings settings,
diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
new file mode 100644
index 0000000..e5002b1
--- /dev/null
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/resolver/EndpointResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.resolver;
+
+import org.apache.bookkeeper.stream.proto.common.Endpoint;
+
+/**
+ * Resolve an endpoint to another endpoint.
+ *
+ * <p>The resolver can be used for resolving the right ip address for an 
advertised endpoint. It is typically useful
+ * in dockerized integration tests, where the test clients are typically 
outside of the docker network.
+ */
+public interface EndpointResolver {
+
+    /**
+     * Returns a resolver that always returns its input endpoint.
+     *
+     * @return a function that always returns its input endpoint
+     */
+    static EndpointResolver identity() {
+        return endpoint -> endpoint;
+    }
+
+    /**
+     * Resolve <tt>endpoint</tt> to another endpoint.
+     *
+     * @param endpoint endpoint to resolve
+     * @return the resolved endpoint.
+     */
+    Endpoint resolve(Endpoint endpoint);
+
+}
diff --git a/stream/pom.xml b/stream/pom.xml
index 208e6d0..f110d52 100644
--- a/stream/pom.xml
+++ b/stream/pom.xml
@@ -38,7 +38,6 @@
     <module>storage</module>
     <module>server</module>
     <module>cli</module>
-    <module>tests</module>
   </modules>
 
   <build>
diff --git 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
index 8c886ea..340a731 100644
--- 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
+++ 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/StorageContainerPlacementPolicy.java
@@ -31,8 +31,16 @@ package org.apache.bookkeeper.stream.protocol.util;
 /**
  * Placement policy to place ranges to group.
  */
+@FunctionalInterface
 public interface StorageContainerPlacementPolicy {
 
+    @FunctionalInterface
+    interface Factory {
+
+        StorageContainerPlacementPolicy newPlacementPolicy();
+
+    }
+
     long placeStreamRange(long streamId, long rangeId);
 
 }
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 11bc8af..ff0cda2 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -187,8 +187,7 @@ public class StreamCluster
                     bookiePort, grpcPort, bkDir, rangesStoreDir, 
spec.serveReadOnlyTable);
                 server = StorageServer.buildStorageServer(
                     serverConf,
-                    grpcPort,
-                    spec.numServers() * 2);
+                    grpcPort);
                 server.start();
                 log.info("Started storage server at (bookie port = {}, grpc 
port = {})",
                     bookiePort, grpcPort);
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index e0b87ef..94c8587 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -55,6 +55,7 @@ import 
org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
+import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
 import org.apache.commons.configuration.CompositeConfiguration;
@@ -143,8 +144,7 @@ public class StorageServer {
         try {
             storageServer = buildStorageServer(
                 conf,
-                grpcPort,
-                1024);
+                grpcPort);
         } catch (ConfigurationException e) {
             log.error("Invalid storage configuration", e);
             return ExitCode.INVALID_CONF.code();
@@ -168,15 +168,13 @@ public class StorageServer {
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration 
conf,
-                                                        int grpcPort,
-                                                        int 
numStorageContainers)
+                                                        int grpcPort)
             throws UnknownHostException, ConfigurationException {
-        return buildStorageServer(conf, grpcPort, numStorageContainers, true, 
NullStatsLogger.INSTANCE);
+        return buildStorageServer(conf, grpcPort, true, 
NullStatsLogger.INSTANCE);
     }
 
     public static LifecycleComponent buildStorageServer(CompositeConfiguration 
conf,
                                                         int grpcPort,
-                                                        int 
numStorageContainers,
                                                         boolean 
startBookieAndStartProvider,
                                                         StatsLogger 
externalStatsLogger)
         throws ConfigurationException, UnknownHostException {
@@ -250,12 +248,21 @@ public class StorageServer {
             .withStorageConfiguration(storageConf)
             // the storage resources shared across multiple components
             .withStorageResources(storageResources)
-            // the number of storage containers
-            .withNumStorageContainers(numStorageContainers)
+            // the placement policy
+            .withStorageContainerPlacementPolicyFactory(() -> {
+                long numStorageContainers;
+                try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
+                    curatorProviderService.get(),
+                    ZKMetadataDriverBase.resolveZkServers(bkServerConf),
+                    ZK_METADATA_ROOT_PATH)) {
+                    numStorageContainers = 
store.getClusterMetadata().getNumStorageContainers();
+                }
+                return StorageContainerPlacementPolicyImpl.of((int) 
numStorageContainers);
+            })
             // the default log backend uri
             .withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
             // with zk-based storage container manager
-            .withStorageContainerManagerFactory((ignored, storeConf, registry) 
->
+            .withStorageContainerManagerFactory((storeConf, registry) ->
                 new ZkStorageContainerManager(
                     myEndpoint,
                     storageConf,
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
index 5e6b0b5..6cf73b0 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StreamStorageLifecycleComponent.java
@@ -42,7 +42,6 @@ public class StreamStorageLifecycleComponent extends 
ServerLifecycleComponent {
         this.streamStorage = StorageServer.buildStorageServer(
             conf.getUnderlyingConf(),
             ssConf.getGrpcPort(),
-            1024, /* indicator */
             false,
             statsLogger.scope("stream"));
     }
diff --git 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
index daa130b..b89a465 100644
--- 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
+++ 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerManagerFactory.java
@@ -24,13 +24,11 @@ public interface StorageContainerManagerFactory {
     /**
      * Create a storage container manager to manage lifecycles of {@link 
StorageContainer}.
      *
-     * @param numStorageContainers num of storage containers.
      * @param conf                 storage configuration
      * @param registry             storage container registry
      * @return storage container manager.
      */
-    StorageContainerManager create(int numStorageContainers,
-                                   StorageConfiguration conf,
+    StorageContainerManager create(StorageConfiguration conf,
                                    StorageContainerRegistry registry);
 
 }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
index 1bd5510..c03b374 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/RangeStoreBuilder.java
@@ -19,10 +19,12 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import java.net.URI;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import 
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.RangeStore;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.RangeStoreImpl;
+import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
 /**
@@ -38,8 +40,9 @@ public final class RangeStoreBuilder {
     private StorageConfiguration storeConf = null;
     private StorageResources storeResources = null;
     private StorageContainerManagerFactory scmFactory = null;
+    private StorageContainerPlacementPolicy.Factory placementPolicyFactory = 
() ->
+        StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
-    private int numStorageContainers = 1024;
     private URI defaultBackendUri = null;
 
     private RangeStoreBuilder() {
@@ -52,7 +55,19 @@ public final class RangeStoreBuilder {
      * @return range store builder
      */
     public RangeStoreBuilder withNumStorageContainers(int 
numStorageContainers) {
-        this.numStorageContainers = numStorageContainers;
+        this.placementPolicyFactory = () -> 
StorageContainerPlacementPolicyImpl.of(numStorageContainers);
+        return this;
+    }
+
+    /**
+     * Build the range store with the provided <tt>placementPolicyFactory</tt>.
+     *
+     * @param placementPolicyFactory placement policy factor to create 
placement policies.
+     * @return range store builder.
+     */
+    public RangeStoreBuilder withStorageContainerPlacementPolicyFactory(
+        StorageContainerPlacementPolicy.Factory placementPolicyFactory) {
+        this.placementPolicyFactory = placementPolicyFactory;
         return this;
     }
 
@@ -130,6 +145,7 @@ public final class RangeStoreBuilder {
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
+        checkNotNull(placementPolicyFactory, "Storage Container Placement 
Policy Factory is not provided");
 
         return new RangeStoreImpl(
             storeConf,
@@ -137,7 +153,7 @@ public final class RangeStoreBuilder {
             scmFactory,
             mvccStoreFactory,
             defaultBackendUri,
-            numStorageContainers,
+            placementPolicyFactory,
             statsLogger);
     }
 
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
index 42fe40b..f175a55 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/RangeStoreImpl.java
@@ -48,7 +48,6 @@ import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
-import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
 
@@ -71,15 +70,14 @@ public class RangeStoreImpl
                           StorageContainerManagerFactory factory,
                           MVCCStoreFactory mvccStoreFactory,
                           URI defaultBackendUri,
-                          int numStorageContainers,
+                          StorageContainerPlacementPolicy.Factory 
placementPolicyFactory,
                           StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.scmFactory = factory;
-        StorageContainerPlacementPolicy placementPolicy =
-            StorageContainerPlacementPolicyImpl.of(numStorageContainers);
         this.storeFactory = mvccStoreFactory;
+        StorageContainerPlacementPolicy placementPolicy = 
placementPolicyFactory.newPlacementPolicy();
         this.scRegistry = new StorageContainerRegistryImpl(
             new DefaultStorageContainerFactory(
                 conf,
@@ -88,7 +86,7 @@ public class RangeStoreImpl
                 storeFactory,
                 defaultBackendUri),
             scheduler);
-        this.scManager = scmFactory.create(numStorageContainers, conf, 
scRegistry);
+        this.scManager = scmFactory.create(conf, scRegistry);
     }
 
     @Override
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
index 1befe82..ac4bb78 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestRangeStoreImpl.java
@@ -211,7 +211,7 @@ public class TestRangeStoreImpl {
         rangeStore = (RangeStoreImpl) RangeStoreBuilder.newBuilder()
             .withStorageConfiguration(storageConf)
             .withStorageResources(storageResources)
-            .withStorageContainerManagerFactory((numScs, storeConf, rgRegistry)
+            .withStorageContainerManagerFactory((storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, 
rgRegistry, 2))
             .withRangeStoreFactory(storeFactory)
             
.withDefaultBackendUri(URI.create("distributedlog://127.0.0.1/stream/storage"))
diff --git a/stream/tests/integration/pom.xml b/stream/tests/integration/pom.xml
deleted file mode 100644
index 77e697a..0000000
--- a/stream/tests/integration/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper.tests</groupId>
-    <artifactId>stream-storage-tests-parent</artifactId>
-    <version>4.8.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>stream-storage-integration-test</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests :: Integration</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-java-client</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>stream-storage-server</artifactId>
-      <version>${project.parent.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.distributedlog</groupId>
-      <artifactId>distributedlog-core</artifactId>
-      <version>${project.parent.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <version>${maven-jar-plugin.version}</version>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin.version}</version>
-        <configuration>
-          <!-- only run tests when -DstreamIntegrationTests is specified //-->
-          <skipTests>true</skipTests>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true 
-XX:MaxDirectMemorySize=2G</argLine>
-          <forkMode>always</forkMode>
-          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>streamIntegrationTests</id>
-      <activation>
-        <property>
-          <name>streamIntegrationTests</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <skipTests>false</skipTests>
-            </configuration>
-          </plugin>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-deploy-plugin</artifactId>
-            <version>${maven-deploy-plugin.version}</version>
-            <configuration>
-              <skip>true</skip>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>
diff --git 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
 
b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
deleted file mode 100644
index ebe99f0..0000000
--- 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageClientTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_RETENTION_POLICY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SEGMENT_ROLLING_POLICY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_SPLIT_POLICY;
-import static org.junit.Assert.assertEquals;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.api.StorageClient;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
-import org.apache.bookkeeper.clients.admin.StorageAdminClient;
-import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
-import org.apache.bookkeeper.stream.proto.RangeKeyType;
-import org.apache.bookkeeper.stream.proto.StreamConfiguration;
-import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Integration test for stream client test.
- */
-@Slf4j
-public class StorageClientTest extends StorageServerTestBase {
-
-    @Rule
-    public final TestName testName = new TestName();
-
-    private String nsName;
-    private String streamName;
-    private StorageAdminClient adminClient;
-    private StorageClient client;
-    private final StreamConfiguration streamConf = 
StreamConfiguration.newBuilder()
-        .setKeyType(RangeKeyType.HASH)
-        .setInitialNumRanges(4)
-        .setMinNumRanges(4)
-        .setRetentionPolicy(DEFAULT_RETENTION_POLICY)
-        .setRollingPolicy(DEFAULT_SEGMENT_ROLLING_POLICY)
-        .setSplitPolicy(DEFAULT_SPLIT_POLICY)
-        .build();
-    private final NamespaceConfiguration colConf = 
NamespaceConfiguration.newBuilder()
-        .setDefaultStreamConf(streamConf)
-        .build();
-
-    @Override
-    protected void doSetup() throws Exception {
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new 
Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        nsName = "test_namespace";
-        FutureUtils.result(
-            adminClient.createNamespace(nsName, colConf));
-        client = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(nsName)
-            .build();
-        streamName = "test_stream";
-        createStream(streamName);
-    }
-
-    @Override
-    protected void doTeardown() throws Exception {
-        if (null != client) {
-            client.closeAsync();
-        }
-        if (null != adminClient) {
-            adminClient.closeAsync();
-        }
-    }
-
-    private void createStream(String streamName) throws Exception {
-        FutureUtils.result(
-            adminClient.createStream(
-                nsName,
-                streamName,
-                streamConf));
-    }
-
-    @Test
-    public void testAdmin() throws Exception {
-        StreamProperties properties =
-            FutureUtils.result(adminClient.getStream(nsName, streamName));
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                
.setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build()
-            , properties.getStreamConf());
-    }
-
-}
diff --git 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
 
b/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
deleted file mode 100644
index 21dc004..0000000
--- 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageServerTestBase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.tests.integration;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.cluster.StreamCluster;
-import org.apache.bookkeeper.stream.cluster.StreamClusterSpec;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-
-/**
- * Test Base for Range Server related tests.
- */
-@Slf4j
-public abstract class StorageServerTestBase {
-
-    static {
-        // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 
3.5.3 four letter words
-        // are disabled by default due to security reasons
-        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
-    }
-
-    @Rule
-    public final TemporaryFolder testDir = new TemporaryFolder();
-
-    protected StreamClusterSpec spec;
-    protected StreamCluster cluster;
-
-    protected StorageServerTestBase() {
-        this(StreamClusterSpec.builder()
-            .baseConf(new CompositeConfiguration())
-            .numServers(3)
-            .build());
-    }
-
-    protected StorageServerTestBase(StreamClusterSpec spec) {
-        this.spec = spec;
-    }
-
-    @Before
-    public void setUp() throws Exception {
-        spec = spec.storageRootDir(testDir.newFolder("tests"));
-        this.cluster = StreamCluster.build(spec);
-        this.cluster.start();
-        doSetup();
-    }
-
-    protected abstract void doSetup() throws Exception;
-
-    @After
-    public void tearDown() throws Exception {
-        doTeardown();
-        if (null != this.cluster) {
-            this.cluster.stop();
-        }
-    }
-
-    protected abstract void doTeardown() throws Exception;
-
-}
diff --git a/stream/tests/integration/src/test/resources/log4j.properties 
b/stream/tests/integration/src/test/resources/log4j.properties
deleted file mode 100644
index 614fe75..0000000
--- a/stream/tests/integration/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,55 +0,0 @@
-#/**
-# * Copyright 2007 The Apache Software Foundation
-# *
-# * Licensed to the Apache Software Foundation (ASF) under one
-# * or more contributor license agreements.  See the NOTICE file
-# * distributed with this work for additional information
-# * regarding copyright ownership.  The ASF licenses this file
-# * to you under the Apache License, Version 2.0 (the
-# * "License"); you may not use this file except in compliance
-# * with the License.  You may obtain a copy of the License at
-# *
-# *     http://www.apache.org/licenses/LICENSE-2.0
-# *
-# * Unless required by applicable law or agreed to in writing, software
-# * distributed under the License is distributed on an "AS IS" BASIS,
-# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# * See the License for the specific language governing permissions and
-# * limitations under the License.
-# */
-
-#
-# DisributedLog Logging Configuration
-#
-
-# Example with rolling log file
-log4j.rootLogger=INFO, CONSOLE
-
-#disable zookeeper logging
-log4j.logger.org.apache.zookeeper=OFF
-#Set the bookkeeper level to warning
-log4j.logger.org.apache.bookkeeper=INFO
-#disable helix logging
-log4j.logger.org.apache.helix=OFF
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=DEBUG
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - 
[%t:%C{1}@%L] - %m%n
-
-# Add ROLLINGFILE to rootLogger to get log file output
-#    Log DEBUG level and above messages to a log file
-#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
-#log4j.appender.ROLLINGFILE.Threshold=INFO
-#log4j.appender.ROLLINGFILE.File=stream.log
-#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
-#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
-#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - 
[%t:%C{1}@%L] - %m%n
-
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.Threshold=TRACE
-log4j.appender.R.File=target/error.log
-log4j.appender.R.MaxFileSize=200MB
-log4j.appender.R.MaxBackupIndex=7
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - 
%m%n
diff --git a/stream/tests/pom.xml b/stream/tests/pom.xml
deleted file mode 100644
index 0f6029b..0000000
--- a/stream/tests/pom.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
-  <packaging>pom</packaging>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.bookkeeper</groupId>
-    <artifactId>stream-storage-parent</artifactId>
-    <version>4.8.0-SNAPSHOT</version>
-  </parent>
-  <groupId>org.apache.bookkeeper.tests</groupId>
-  <artifactId>stream-storage-tests-parent</artifactId>
-  <name>Apache BookKeeper :: Stream Storage :: Tests</name>
-  <modules>
-    <module>integration</module>
-  </modules>
-</project>
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
index 42b428e..92e39c3 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/LocationClientTest.java
@@ -52,12 +52,8 @@ public class LocationClientTest extends 
StreamClusterTestBase {
             .name("location-client-test")
             .numThreads(1)
             .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(getExsternalStreamEndpoints().toArray(new 
Endpoint[getNumBookies()]))
-            .usePlaintext(true)
-            .build();
         client = new LocationClientImpl(
-            settings,
+            newStorageClientSettings(),
             scheduler);
     }
 
diff --git 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
similarity index 86%
rename from 
stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
rename to 
tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
index 03edfb8..569cf7c 100644
--- 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/StorageAdminClientTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StorageAdminClientTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 import static org.junit.Assert.assertEquals;
@@ -39,6 +39,8 @@ import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -46,37 +48,23 @@ import org.junit.rules.TestName;
 /**
  * Integration test for stream admin client test.
  */
-public class StorageAdminClientTest extends StorageServerTestBase {
+public class StorageAdminClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("admin-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new 
Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
+    @Before
+    public void setup() {
+        adminClient = createStorageAdminClient(newStorageClientSettings());
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     @Test
@@ -152,11 +140,6 @@ public class StorageAdminClientTest extends 
StorageServerTestBase {
             .build();
         StreamProperties streamProps = 
FutureUtils.result(adminClient.createStream(nsName, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                
.setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // create a duplicated stream
         try {
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
index b86cc72..fe7c914 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/StreamClusterTestBase.java
@@ -21,6 +21,10 @@ package org.apache.bookkeeper.tests.integration.stream;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.admin.StorageAdminClient;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import 
org.apache.bookkeeper.tests.integration.cluster.BookKeeperClusterTestBase;
@@ -69,5 +73,35 @@ public abstract class StreamClusterTestBase extends 
BookKeeperClusterTestBase {
             .collect(Collectors.toList());
     }
 
+    //
+    // Test Util Methods
+    //
+
+    protected static StorageClientSettings newStorageClientSettings() {
+        return StorageClientSettings.newBuilder()
+            .addEndpoints(getExsternalStreamEndpoints().toArray(new 
Endpoint[getNumBookies()]))
+            .endpointResolver(endpoint -> {
+                String internalEndpointStr = 
NetUtils.endpointToString(endpoint);
+                String externalEndpointStr =
+                    
bkCluster.resolveExternalGrpcEndpointStr(internalEndpointStr);
+                log.info("Resolve endpoint {} to {}", internalEndpointStr, 
externalEndpointStr);
+                return NetUtils.parseEndpoint(externalEndpointStr);
+            })
+            .usePlaintext(true)
+            .build();
+    }
+
+    protected static StorageAdminClient 
createStorageAdminClient(StorageClientSettings settings) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .buildAdmin();
+    }
+
+    protected static StorageClient createStorageClient(StorageClientSettings 
settings, String namespace) {
+        return StorageClientBuilder.newBuilder()
+            .withSettings(settings)
+            .withNamespace(namespace)
+            .build();
+    }
 
 }
diff --git 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
similarity index 85%
rename from 
stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
rename to 
tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index 7add6d7..d1ff091 100644
--- 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientSimpleTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
@@ -38,15 +38,14 @@ import org.apache.bookkeeper.api.kv.PTable;
 import org.apache.bookkeeper.api.kv.exceptions.KvApiException;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -55,47 +54,31 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientSimpleTest extends StorageServerTestBase {
+public class TableClientSimpleTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new 
Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -123,11 +106,6 @@ public class TableClientSimpleTest extends 
StorageServerTestBase {
         StreamProperties streamProps = result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                
.setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = 
result(storageClient.openPTable(streamName));
diff --git 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
similarity index 89%
rename from 
stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
rename to 
tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 7db265b..0dff7fa 100644
--- 
a/stream/tests/integration/src/test/java/org/apache/bookkeeper/stream/tests/integration/TableClientTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.bookkeeper.stream.tests.integration;
+package org.apache.bookkeeper.tests.integration.stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
@@ -47,16 +47,15 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
 import org.apache.bookkeeper.api.kv.result.RangeResult;
 import org.apache.bookkeeper.api.kv.result.Result;
 import org.apache.bookkeeper.api.kv.result.TxnResult;
-import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
-import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -65,48 +64,32 @@ import org.junit.rules.TestName;
  * Integration test for table service.
  */
 @Slf4j
-public class TableClientTest extends StorageServerTestBase {
+public class TableClientTest extends StreamClusterTestBase {
 
     @Rule
     public final TestName testName = new TestName();
 
     private final String namespace = "test_namespace";
-    private OrderedScheduler scheduler;
     private StorageAdminClient adminClient;
     private StorageClient storageClient;
     private final OptionFactory<ByteBuf> optionFactory = new 
OptionFactoryImpl<>();
 
-    @Override
-    protected void doSetup() throws Exception {
-        scheduler = OrderedScheduler.newSchedulerBuilder()
-            .name("table-client-test")
-            .numThreads(1)
-            .build();
-        StorageClientSettings settings = StorageClientSettings.newBuilder()
-            .addEndpoints(cluster.getRpcEndpoints().toArray(new 
Endpoint[cluster.getRpcEndpoints().size()]))
-            .usePlaintext(true)
-            .build();
+    @Before
+    public void setup() {
+        StorageClientSettings settings = newStorageClientSettings();
         String namespace = "test_namespace";
-        adminClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .buildAdmin();
-        storageClient = StorageClientBuilder.newBuilder()
-            .withSettings(settings)
-            .withNamespace(namespace)
-            .build();
+        adminClient = createStorageAdminClient(settings);
+        storageClient = createStorageClient(settings, namespace);
     }
 
-    @Override
-    protected void doTeardown() throws Exception {
+    @After
+    public void teardown() {
         if (null != adminClient) {
             adminClient.close();
         }
         if (null != storageClient) {
             storageClient.close();
         }
-        if (null != scheduler) {
-            scheduler.shutdown();
-        }
     }
 
     private static ByteBuf getLKey(int i) {
@@ -134,11 +117,6 @@ public class TableClientTest extends StorageServerTestBase 
{
         StreamProperties streamProps = FutureUtils.result(
             adminClient.createStream(namespace, streamName, streamConf));
         assertEquals(streamName, streamProps.getStreamName());
-        assertEquals(
-            StreamConfiguration.newBuilder(streamConf)
-                
.setBackendServiceUrl(cluster.getDefaultBackendUri().toString())
-                .build(),
-            streamProps.getStreamConf());
 
         // Open the table
         PTable<ByteBuf, ByteBuf> table = 
FutureUtils.result(storageClient.openPTable(streamName));
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
index 80ae906..9a7cd86 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/topologies/BKCluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.bookkeeper.tests.integration.topologies;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -60,6 +62,7 @@ public class BKCluster {
     private final Network network;
     private final MetadataStoreContainer metadataContainer;
     private final Map<String, BookieContainer> bookieContainers;
+    private final Map<String, String> internalEndpointsToExternalEndpoints;
     private final int numBookies;
     private final String extraServerComponents;
     private volatile boolean enableContainerLog;
@@ -72,6 +75,7 @@ public class BKCluster {
             .withNetwork(network)
             .withNetworkAliases(ZKContainer.HOST_NAME);
         this.bookieContainers = Maps.newTreeMap();
+        this.internalEndpointsToExternalEndpoints = Maps.newConcurrentMap();
         this.numBookies = spec.numBookies();
         this.extraServerComponents = spec.extraServerComponents();
         this.enableContainerLog = spec.enableContainerLog();
@@ -114,6 +118,13 @@ public class BKCluster {
         createBookies("bookie", numBookies);
     }
 
+    public String resolveExternalGrpcEndpointStr(String 
internalGrpcEndpointStr) {
+        String externalGrpcEndpointStr = 
internalEndpointsToExternalEndpoints.get(internalGrpcEndpointStr);
+        checkNotNull(externalGrpcEndpointStr,
+            "No internal grpc endpoint is found : " + internalGrpcEndpointStr);
+        return externalGrpcEndpointStr;
+    }
+
     public void stop() {
         synchronized (this) {
             bookieContainers.values().forEach(BookieContainer::stop);
@@ -155,6 +166,7 @@ public class BKCluster {
         synchronized (this) {
             container = bookieContainers.remove(bookieName);
             if (null != container) {
+                
internalEndpointsToExternalEndpoints.remove(container.getInternalGrpcEndpointStr());
                 container.stop();
             }
         }
@@ -175,8 +187,6 @@ public class BKCluster {
                 if (enableContainerLog) {
                     container.tailContainerLog();
                 }
-
-                log.info("Created bookie {}", bookieName);
                 bookieContainers.put(bookieName, container);
             }
         }
@@ -184,6 +194,11 @@ public class BKCluster {
         if (shouldStart) {
             log.info("Starting bookie {}", bookieName);
             container.start();
+            log.info("Started bookie {} : internal endpoint = {}, external 
endpoint = {}",
+                bookieName, container.getInternalGrpcEndpointStr(), 
container.getExternalGrpcEndpointStr());
+            internalEndpointsToExternalEndpoints.put(
+                container.getInternalGrpcEndpointStr(),
+                container.getExternalGrpcEndpointStr());
         }
         return container;
     }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to