This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c77d2  [TABLE SERVICE] [CLIENT] storage client can open tables on a 
different namespace
26c77d2 is described below

commit 26c77d24f848a42c8e339820dbea869233f9475e
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Oct 8 18:02:35 2018 -0700

    [TABLE SERVICE] [CLIENT] storage client can open tables on a different 
namespace
    
    Descriptions of the changes in this PR:
    
    
    *Motivation*
    
    In some cases, we are using same storage client for opening tables under 
different namespaces.
    It would be good that a client instance can do that.
    
    *Changes*
    
    - Add methods to `StorageClient` to open tables under different namespaces
    - Add `asClient` to `StorageAdminClient` to convert admin client to storage 
client
    
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None>
    
    This closes #1733 from sijie/improve_client_interface
---
 .../org/apache/bookkeeper/api/StorageClient.java   |  32 +++++++
 .../bookkeeper/clients/SimpleClientBase.java       |  28 ++++--
 .../clients/SimpleStorageClientImpl.java           |  46 ++++++++--
 .../bookkeeper/clients/StorageClientBuilder.java   |   2 +-
 .../bookkeeper/clients/StorageClientImpl.java      |  64 +++++++++----
 .../admin/SimpleStorageAdminClientImpl.java        |  12 +++
 .../clients/admin/StorageAdminClient.java          |  65 +++++++++++++
 .../clients/admin/StorageAdminClientImpl.java      |  38 ++++----
 .../bookkeeper/clients/StorageClientImplTest.java  | 101 ++++++++++++++++++++-
 .../clients/admin/TestStorageAdminClientImpl.java  |   9 +-
 10 files changed, 341 insertions(+), 56 deletions(-)

diff --git 
a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java 
b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
index 969a8bb..419a828 100644
--- a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
+++ b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
@@ -33,8 +33,40 @@ import org.apache.bookkeeper.common.util.AutoAsyncCloseable;
 @Evolving
 public interface StorageClient extends AutoAsyncCloseable {
 
+    /**
+     * Open a {@link PTable} <tt>table</tt> under <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param table table name
+     * @return a future represents the open result
+     */
+    CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespace, 
String table);
+
+    /**
+     * Open a {@link PTable} <tt>table</tt> under the default namespace of 
this client.
+     * The default namespace is configured when creating {@link StorageClient}.
+     *
+     * @param table table name
+     * @return a future represents the open result
+     */
     CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String table);
 
+    /**
+     * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param table table name
+     * @return a future represents the open result
+     */
+    CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespace, 
String table);
+
+    /**
+     * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+     * The default namespace is configured when creating {@link StorageClient}.
+     *
+     * @param table table name
+     * @return a future represents the open result
+     */
     CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table);
 
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
index 3f6bc77..c213687 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
@@ -38,9 +38,11 @@ import 
org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
  */
 public class SimpleClientBase extends AbstractAutoAsyncCloseable {
 
+    protected final StorageClientSettings settings;
     protected final Resource<OrderedScheduler> schedulerResource;
     protected final OrderedScheduler scheduler;
     protected final ManagedChannel managedChannel;
+    protected final boolean ownChannel;
     protected final Channel channel;
     protected final RetryUtils retryUtils;
 
@@ -50,11 +52,23 @@ public class SimpleClientBase extends 
AbstractAutoAsyncCloseable {
 
     protected SimpleClientBase(StorageClientSettings settings,
                                Resource<OrderedScheduler> schedulerResource) {
-        this.managedChannel = 
GrpcChannels.createChannelBuilder(settings.serviceUri(), settings).build();
+        this(
+            settings,
+            schedulerResource,
+            GrpcChannels.createChannelBuilder(settings.serviceUri(), 
settings).build(),
+            true);
+    }
+
+    protected SimpleClientBase(StorageClientSettings settings,
+                               Resource<OrderedScheduler> schedulerResource,
+                               ManagedChannel managedChannel,
+                               boolean ownChannel) {
+        this.settings = settings;
+        this.managedChannel = managedChannel;
+        this.ownChannel = ownChannel;
         this.channel = ClientInterceptors.intercept(
             managedChannel,
             new StorageContainerClientInterceptor(0L));
-
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.retryUtils = RetryUtils.create(settings.backoffPolicy(), 
scheduler);
@@ -62,10 +76,10 @@ public class SimpleClientBase extends 
AbstractAutoAsyncCloseable {
 
     @Override
     protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
-        managedChannel.shutdown();
-        scheduler.submit(() -> {
-            SharedResourceManager.shared().release(schedulerResource, 
scheduler);
-            closeFuture.complete(null);
-        });
+        if (ownChannel) {
+            managedChannel.shutdown();
+        }
+        SharedResourceManager.shared().release(schedulerResource, scheduler);
+        closeFuture.complete(null);
     }
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
index 4817dcc..79ffb0e 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.bookkeeper.common.util.ListenableFutures.fromListenable
 import static 
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStreamRequest;
 
 import io.grpc.CallOptions;
+import io.grpc.ManagedChannel;
 import io.netty.buffer.ByteBuf;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@ import 
org.apache.bookkeeper.clients.impl.kv.PByteBufSimpleTableImpl;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.ExceptionUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
@@ -48,17 +51,28 @@ import 
org.apache.bookkeeper.stream.proto.storage.StatusCode;
  * The implementation of {@link StorageClient} client.
  */
 @Slf4j
-class SimpleStorageClientImpl extends SimpleClientBase implements 
StorageClient {
+public class SimpleStorageClientImpl extends SimpleClientBase implements 
StorageClient {
 
     private static final String COMPONENT_NAME = 
SimpleStorageClientImpl.class.getSimpleName();
 
-    private final String namespaceName;
+    private final String defaultNamespace;
     private final RootRangeServiceFutureStub rootRangeService;
 
     public SimpleStorageClientImpl(String namespaceName,
                                    StorageClientSettings settings) {
         super(settings);
-        this.namespaceName = namespaceName;
+        this.defaultNamespace = namespaceName;
+        this.rootRangeService = GrpcUtils.configureGrpcStub(
+            RootRangeServiceGrpc.newFutureStub(channel),
+            Optional.empty());
+    }
+
+    public SimpleStorageClientImpl(String namespaceName,
+                                   StorageClientSettings settings,
+                                   Resource<OrderedScheduler> 
schedulerResource,
+                                   ManagedChannel channel) {
+        super(settings, schedulerResource, channel, false);
+        this.defaultNamespace = namespaceName;
         this.rootRangeService = GrpcUtils.configureGrpcStub(
             RootRangeServiceGrpc.newFutureStub(channel),
             Optional.empty());
@@ -69,28 +83,42 @@ class SimpleStorageClientImpl extends SimpleClientBase 
implements StorageClient
     //
 
     @Override
-    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
streamName) {
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
tableName) {
+        return openPTable(defaultNamespace, tableName);
+    }
+
+    @Override
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
namespaceName,
+                                                                  String 
tableName) {
         return ExceptionUtils.callAndHandleClosedAsync(
             COMPONENT_NAME,
             isClosed(),
-            (future) -> openStreamAsTableImpl(streamName, future));
+            (future) -> openTableImpl(namespaceName, tableName, future));
     }
 
     @Override
     public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
-        return openPTable(table)
+        return openTable(defaultNamespace, table);
+    }
+
+    @Override
+    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String 
namespaceName,
+                                                                String table) {
+        return openPTable(namespaceName, table)
             .thenApply(pTable -> new ByteBufTableImpl(pTable));
     }
 
-    private void openStreamAsTableImpl(String streamName,
-                                       CompletableFuture<PTable<ByteBuf, 
ByteBuf>> future) {
+    private void openTableImpl(String namespaceName,
+                               String streamName,
+                               CompletableFuture<PTable<ByteBuf, ByteBuf>> 
future) {
         CompletableFuture<StreamProperties> getStreamFuture = 
retryUtils.execute(() ->
             fromListenableFuture(rootRangeService.getStream(
                 createGetStreamRequest(namespaceName, streamName)))
         ).thenCompose(resp -> {
             if (StatusCode.SUCCESS == resp.getCode()) {
                 StreamProperties streamProps = resp.getStreamProps();
-                log.info("Retrieved table properties for table {} : {}", 
streamName, streamProps);
+                log.info("Retrieved table properties for table {}/{} : {}",
+                    namespaceName, streamName, streamProps);
                 if (StorageType.TABLE != 
streamProps.getStreamConf().getStorageType()) {
                     return FutureUtils.exception(new ApiException(
                         "Can't open a non-table storage entity : " + 
streamProps.getStreamConf().getStorageType()));
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
index 3a7ff4c..82bad79 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
@@ -105,7 +105,7 @@ public class StorageClientBuilder implements 
Supplier<StorageClient> {
         if (settings.enableServerSideRouting()) {
             return new SimpleStorageAdminClientImpl(settings);
         } else {
-            return new StorageAdminClientImpl(settings);
+            return new StorageAdminClientImpl(settings, 
ClientResources.create());
         }
     }
 
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 9d49556..2b5bc48 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -44,57 +44,85 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
  * The implementation of {@link StorageClient} client.
  */
 @Slf4j
-class StorageClientImpl extends AbstractAutoAsyncCloseable implements 
StorageClient {
+public class StorageClientImpl extends AbstractAutoAsyncCloseable implements 
StorageClient {
 
     private static final String COMPONENT_NAME = 
StorageClientImpl.class.getSimpleName();
 
-    private final String namespaceName;
+    private final String defaultNamespace;
     private final StorageClientSettings settings;
     private final ClientResources resources;
     private final OrderedScheduler scheduler;
 
     // clients
     private final StorageServerClientManager serverManager;
+    private final boolean ownServerManager;
+
+    StorageClientImpl(String namespaceName,
+                      StorageClientSettings settings,
+                      ClientResources resources) {
+        this(
+            namespaceName,
+            settings,
+            resources,
+            new StorageServerClientManagerImpl(settings, 
resources.scheduler()),
+            true);
+    }
 
     public StorageClientImpl(String namespaceName,
                              StorageClientSettings settings,
-                             ClientResources resources) {
-        this.namespaceName = namespaceName;
+                             ClientResources resources,
+                             StorageServerClientManager serverManager,
+                             boolean ownServerManager) {
+        this.defaultNamespace = namespaceName;
         this.settings = settings;
         this.resources = resources;
-        this.serverManager = new StorageServerClientManagerImpl(settings, 
resources.scheduler());
+        this.serverManager = serverManager;
+        this.ownServerManager = ownServerManager;
         this.scheduler = 
SharedResourceManager.shared().get(resources.scheduler());
-
     }
 
-    CompletableFuture<StreamProperties> getStreamProperties(String streamName) 
{
+    CompletableFuture<StreamProperties> getStreamProperties(String 
namespaceName,
+                                                            String streamName) 
{
         return 
this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
     }
 
     //
-    // Materialized Views
+    // Tables
     //
 
     @Override
-    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
streamName) {
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
tableName) {
+        return openPTable(defaultNamespace, tableName);
+    }
+
+    @Override
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
namespaceName,
+                                                                  String 
tableName) {
         return ExceptionUtils.callAndHandleClosedAsync(
             COMPONENT_NAME,
             isClosed(),
-            (future) -> openStreamAsTableImpl(streamName, future));
+            (future) -> openTableImpl(namespaceName, tableName, future));
     }
 
     @Override
     public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
-        return openPTable(table)
+        return openTable(defaultNamespace, table);
+    }
+
+    @Override
+    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String 
namespaceName,
+                                                                String table) {
+        return openPTable(namespaceName, table)
             .thenApply(pTable -> new ByteBufTableImpl(pTable));
     }
 
-    private void openStreamAsTableImpl(String streamName,
-                                       CompletableFuture<PTable<ByteBuf, 
ByteBuf>> future) {
+    private void openTableImpl(String namespaceName,
+                               String tableName,
+                               CompletableFuture<PTable<ByteBuf, ByteBuf>> 
future) {
         FutureUtils.proxyTo(
-            getStreamProperties(streamName).thenComposeAsync(props -> {
+            getStreamProperties(namespaceName, 
tableName).thenComposeAsync(props -> {
                 if (log.isInfoEnabled()) {
-                    log.info("Retrieved table properties for table {} : {}", 
streamName, props);
+                    log.info("Retrieved table properties for table {}/{} : 
{}", namespaceName, tableName, props);
                 }
                 if (StorageType.TABLE != 
props.getStreamConf().getStorageType()) {
                     return FutureUtils.exception(new ApiException(
@@ -102,7 +130,7 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable 
implements StorageCli
                     );
                 }
                 return new PByteBufTableImpl(
-                    streamName,
+                    tableName,
                     props,
                     serverManager,
                     scheduler.chooseThread(props.getStreamId()),
@@ -120,7 +148,9 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable 
implements StorageCli
     @Override
     protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
         scheduler.submit(() -> {
-            serverManager.close();
+            if (ownServerManager) {
+                serverManager.close();
+            }
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), 
scheduler);
         });
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
index cbacbc3..04072e1 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
@@ -30,7 +30,9 @@ import static 
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStr
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.clients.SimpleClientBase;
+import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
@@ -69,6 +71,16 @@ public class SimpleStorageAdminClientImpl extends 
SimpleClientBase implements St
     }
 
     @Override
+    public StorageClient asClient(String namespace) {
+        return new SimpleStorageClientImpl(
+            namespace,
+            settings,
+            schedulerResource,
+            managedChannel
+        );
+    }
+
+    @Override
     public CompletableFuture<NamespaceProperties> createNamespace(String 
namespace, NamespaceConfiguration conf) {
         return retryUtils.execute(() ->
             
fromListenableFuture(rootRangeService.createNamespace(createCreateNamespaceRequest(namespace,
 conf)))
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
index 55cc63a..ec3485a 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
@@ -18,6 +18,9 @@
 package org.apache.bookkeeper.clients.admin;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.bookkeeper.common.util.AutoAsyncCloseable;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -27,21 +30,83 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
 /**
  * A storage admin client.
  */
+@Public
+@Evolving
 public interface StorageAdminClient extends AutoAsyncCloseable {
 
+    /**
+     * Convert the storage admin client to a client.
+     *
+     * @return storage client
+     */
+    default StorageClient asClient() {
+        return asClient(null);
+    }
+
+    /**
+     * Convert the storage admin client to a client with default 
<tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @return storage client
+     */
+    StorageClient asClient(String namespace);
+
+    /**
+     * Create a <code>namespace</code> with the provided namespace 
configuration <tt>conf</tt>.
+     *
+     * @param namespace namespace
+     * @param conf namespace configuration
+     * @return a future represent the creation result
+     */
     CompletableFuture<NamespaceProperties> createNamespace(String namespace,
                                                            
NamespaceConfiguration conf);
 
+    /**
+     * Delete a <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @return a future represents the deletion result
+     */
     CompletableFuture<Boolean> deleteNamespace(String namespace);
 
+    /**
+     * Get the namespace properties of a given <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @return a future represents the get result
+     */
     CompletableFuture<NamespaceProperties> getNamespace(String namespace);
 
+    /**
+     * Create a stream <code>streamName</code> under namespace 
<code>namespace</code>
+     * with the provided stream configuration <tt>streamConfiguration</tt>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @param streamConfiguration stream configuration
+     * @return a future represents the creation result
+     */
     CompletableFuture<StreamProperties> createStream(String namespace,
                                                      String streamName,
                                                      StreamConfiguration 
streamConfiguration);
 
+    /**
+     * Delete a <code>stream</code> from the provided <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @return a future represents the deletion result
+     */
     CompletableFuture<Boolean> deleteStream(String namespace, String 
streamName);
 
+    /**
+     * Retrieve the stream properties of a given <code>stream</code> under
+     * the provided <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @return a future represents the get result
+     */
     CompletableFuture<StreamProperties> getStream(String namespace, String 
streamName);
 
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
index 8c392a0..ea571bd 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
@@ -18,19 +18,18 @@
 
 package org.apache.bookkeeper.clients.admin;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientImpl;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import 
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -43,6 +42,8 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
 public class StorageAdminClientImpl extends AbstractAutoAsyncCloseable 
implements StorageAdminClient {
 
     // clients
+    private final StorageClientSettings settings;
+    private final ClientResources resources;
     private final StorageServerClientManager clientManager;
     private final RootRangeClient rootRangeClient;
 
@@ -50,31 +51,32 @@ public class StorageAdminClientImpl extends 
AbstractAutoAsyncCloseable implement
      * Create a stream admin client with provided {@code withSettings}.
      *
      * @param settings withSettings to create an admin client.
+     * @param resources resources used by this client
      */
-    public StorageAdminClientImpl(StorageClientSettings settings) {
+    public StorageAdminClientImpl(StorageClientSettings settings,
+                                  ClientResources resources) {
         this(
             settings,
-            ClientResources.create().scheduler());
-    }
-
-    /**
-     * Create a stream admin client with provided {@code withSettings} and 
{@code scheduler}.
-     *
-     * @param settings          withSettings to create an admin client.
-     * @param schedulerResource scheduler to execute.
-     */
-    public StorageAdminClientImpl(StorageClientSettings settings,
-                                  Resource<OrderedScheduler> 
schedulerResource) {
-        this(() -> new StorageServerClientManagerImpl(settings, 
schedulerResource));
+            resources,
+            () -> new StorageServerClientManagerImpl(settings, 
resources.scheduler()));
     }
 
-    @VisibleForTesting
-    StorageAdminClientImpl(Supplier<StorageServerClientManager> factory) {
+    StorageAdminClientImpl(StorageClientSettings settings,
+                           ClientResources resources,
+                           Supplier<StorageServerClientManager> factory) {
+        this.settings = settings;
+        this.resources = resources;
         this.clientManager = factory.get();
         this.rootRangeClient = this.clientManager.getRootRangeClient();
     }
 
     @Override
+    public StorageClient asClient(String namespace) {
+        return new StorageClientImpl(
+            namespace, settings, resources, clientManager, false);
+    }
+
+    @Override
     public CompletableFuture<NamespaceProperties> createNamespace(String 
namespace,
                                                                   
NamespaceConfiguration colConf) {
         return rootRangeClient.createNamespace(namespace, colConf);
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
index 6ae9035..9f0e6df 100644
--- 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -90,7 +91,7 @@ public class StorageClientImplTest extends GrpcClientTestBase 
{
                 .setStorageType(StorageType.TABLE)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -109,13 +110,58 @@ public class StorageClientImplTest extends 
GrpcClientTestBase {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void testOpenPTableDiffernetNamespace() throws Exception {
+        StreamProperties tableProps1 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table1")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+            .thenReturn(FutureUtils.value(tableProps1));
+
+        StreamProperties tableProps2 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table2")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+            .thenReturn(FutureUtils.value(tableProps2));
+
+        PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+        
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+        PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+        
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl1);
+
+        PTable<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+            client.openPTable("table1")
+        );
+        assertSame(tableImpl1, returnedTableImpl1);
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl2);
+
+        PTable<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+            client.openPTable("table2")
+        );
+        assertSame(tableImpl2, returnedTableImpl2);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void testOpenTable() throws Exception {
         StreamProperties streamProps = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
             .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
                 .setStorageType(StorageType.TABLE)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -136,13 +182,62 @@ public class StorageClientImplTest extends 
GrpcClientTestBase {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void testOpenTableWithDifferentNamespace() throws Exception {
+        StreamProperties tableProps1 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table1")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+            .thenReturn(FutureUtils.value(tableProps1));
+
+        StreamProperties tableProps2 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table2")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+            .thenReturn(FutureUtils.value(tableProps2));
+
+        PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+        
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+        PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+        
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl1);
+
+        Table<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+            client.openTable("table1")
+        );
+        assertTrue(returnedTableImpl1 instanceof ByteBufTableImpl);
+        ByteBufTableImpl bytesTableImpl1 = (ByteBufTableImpl) 
returnedTableImpl1;
+        assertSame(tableImpl1, Whitebox.getInternalState(bytesTableImpl1, 
"underlying"));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl2);
+
+        Table<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+            client.openTable("table2")
+        );
+        assertTrue(returnedTableImpl2 instanceof ByteBufTableImpl);
+        ByteBufTableImpl bytesTableImpl2 = (ByteBufTableImpl) 
returnedTableImpl2;
+        assertSame(tableImpl2, Whitebox.getInternalState(bytesTableImpl2, 
"underlying"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void testOpenPTableIllegalOp() throws Exception {
         StreamProperties streamProps = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
             .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
                 .setStorageType(StorageType.STREAM)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
index 0787abc..d9a791a 100644
--- 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
@@ -25,8 +25,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -66,7 +68,12 @@ public class TestStorageAdminClientImpl {
     @Before
     public void setUp() {
         when(mockManager.getRootRangeClient()).thenReturn(mockRootRangeClient);
-        this.adminClient = new StorageAdminClientImpl(() -> mockManager);
+        this.adminClient = new StorageAdminClientImpl(
+            StorageClientSettings.newBuilder()
+                .serviceUri("bk://localhost:4181")
+                .build(),
+            ClientResources.create(),
+            () -> mockManager);
     }
 
     @Test

Reply via email to