sijie closed pull request #1733: [table service][client] storage client can 
open tables on a different namespace
URL: https://github.com/apache/bookkeeper/pull/1733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java 
b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
index 969a8bbfe5..419a8289e3 100644
--- a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
+++ b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
@@ -33,8 +33,40 @@
 @Evolving
 public interface StorageClient extends AutoAsyncCloseable {
 
+    /**
+     * Open a {@link PTable} <tt>table</tt> under <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param table table name
+     * @return a future represents the open result
+     */
+    CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespace, 
String table);
+
+    /**
+     * Open a {@link PTable} <tt>table</tt> under the default namespace of 
this client.
+     * The default namespace is configured when creating {@link StorageClient}.
+     *
+     * @param table table name
+     * @return a future represents the open result
+     */
     CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String table);
 
+    /**
+     * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param table table name
+     * @return a future represents the open result
+     */
+    CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespace, 
String table);
+
+    /**
+     * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+     * The default namespace is configured when creating {@link StorageClient}.
+     *
+     * @param table table name
+     * @return a future represents the open result
+     */
     CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table);
 
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
index 3f6bc776ce..c213687e91 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
@@ -38,9 +38,11 @@
  */
 public class SimpleClientBase extends AbstractAutoAsyncCloseable {
 
+    protected final StorageClientSettings settings;
     protected final Resource<OrderedScheduler> schedulerResource;
     protected final OrderedScheduler scheduler;
     protected final ManagedChannel managedChannel;
+    protected final boolean ownChannel;
     protected final Channel channel;
     protected final RetryUtils retryUtils;
 
@@ -50,11 +52,23 @@ protected SimpleClientBase(StorageClientSettings settings) {
 
     protected SimpleClientBase(StorageClientSettings settings,
                                Resource<OrderedScheduler> schedulerResource) {
-        this.managedChannel = 
GrpcChannels.createChannelBuilder(settings.serviceUri(), settings).build();
+        this(
+            settings,
+            schedulerResource,
+            GrpcChannels.createChannelBuilder(settings.serviceUri(), 
settings).build(),
+            true);
+    }
+
+    protected SimpleClientBase(StorageClientSettings settings,
+                               Resource<OrderedScheduler> schedulerResource,
+                               ManagedChannel managedChannel,
+                               boolean ownChannel) {
+        this.settings = settings;
+        this.managedChannel = managedChannel;
+        this.ownChannel = ownChannel;
         this.channel = ClientInterceptors.intercept(
             managedChannel,
             new StorageContainerClientInterceptor(0L));
-
         this.schedulerResource = schedulerResource;
         this.scheduler = SharedResourceManager.shared().get(schedulerResource);
         this.retryUtils = RetryUtils.create(settings.backoffPolicy(), 
scheduler);
@@ -62,10 +76,10 @@ protected SimpleClientBase(StorageClientSettings settings,
 
     @Override
     protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
-        managedChannel.shutdown();
-        scheduler.submit(() -> {
-            SharedResourceManager.shared().release(schedulerResource, 
scheduler);
-            closeFuture.complete(null);
-        });
+        if (ownChannel) {
+            managedChannel.shutdown();
+        }
+        SharedResourceManager.shared().release(schedulerResource, scheduler);
+        closeFuture.complete(null);
     }
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
index 4817dccb1d..79ffb0ee2a 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
@@ -24,6 +24,7 @@
 import static 
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStreamRequest;
 
 import io.grpc.CallOptions;
+import io.grpc.ManagedChannel;
 import io.netty.buffer.ByteBuf;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.ExceptionUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
@@ -48,17 +51,28 @@
  * The implementation of {@link StorageClient} client.
  */
 @Slf4j
-class SimpleStorageClientImpl extends SimpleClientBase implements 
StorageClient {
+public class SimpleStorageClientImpl extends SimpleClientBase implements 
StorageClient {
 
     private static final String COMPONENT_NAME = 
SimpleStorageClientImpl.class.getSimpleName();
 
-    private final String namespaceName;
+    private final String defaultNamespace;
     private final RootRangeServiceFutureStub rootRangeService;
 
     public SimpleStorageClientImpl(String namespaceName,
                                    StorageClientSettings settings) {
         super(settings);
-        this.namespaceName = namespaceName;
+        this.defaultNamespace = namespaceName;
+        this.rootRangeService = GrpcUtils.configureGrpcStub(
+            RootRangeServiceGrpc.newFutureStub(channel),
+            Optional.empty());
+    }
+
+    public SimpleStorageClientImpl(String namespaceName,
+                                   StorageClientSettings settings,
+                                   Resource<OrderedScheduler> 
schedulerResource,
+                                   ManagedChannel channel) {
+        super(settings, schedulerResource, channel, false);
+        this.defaultNamespace = namespaceName;
         this.rootRangeService = GrpcUtils.configureGrpcStub(
             RootRangeServiceGrpc.newFutureStub(channel),
             Optional.empty());
@@ -69,28 +83,42 @@ public SimpleStorageClientImpl(String namespaceName,
     //
 
     @Override
-    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
streamName) {
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
tableName) {
+        return openPTable(defaultNamespace, tableName);
+    }
+
+    @Override
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
namespaceName,
+                                                                  String 
tableName) {
         return ExceptionUtils.callAndHandleClosedAsync(
             COMPONENT_NAME,
             isClosed(),
-            (future) -> openStreamAsTableImpl(streamName, future));
+            (future) -> openTableImpl(namespaceName, tableName, future));
     }
 
     @Override
     public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
-        return openPTable(table)
+        return openTable(defaultNamespace, table);
+    }
+
+    @Override
+    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String 
namespaceName,
+                                                                String table) {
+        return openPTable(namespaceName, table)
             .thenApply(pTable -> new ByteBufTableImpl(pTable));
     }
 
-    private void openStreamAsTableImpl(String streamName,
-                                       CompletableFuture<PTable<ByteBuf, 
ByteBuf>> future) {
+    private void openTableImpl(String namespaceName,
+                               String streamName,
+                               CompletableFuture<PTable<ByteBuf, ByteBuf>> 
future) {
         CompletableFuture<StreamProperties> getStreamFuture = 
retryUtils.execute(() ->
             fromListenableFuture(rootRangeService.getStream(
                 createGetStreamRequest(namespaceName, streamName)))
         ).thenCompose(resp -> {
             if (StatusCode.SUCCESS == resp.getCode()) {
                 StreamProperties streamProps = resp.getStreamProps();
-                log.info("Retrieved table properties for table {} : {}", 
streamName, streamProps);
+                log.info("Retrieved table properties for table {}/{} : {}",
+                    namespaceName, streamName, streamProps);
                 if (StorageType.TABLE != 
streamProps.getStreamConf().getStorageType()) {
                     return FutureUtils.exception(new ApiException(
                         "Can't open a non-table storage entity : " + 
streamProps.getStreamConf().getStorageType()));
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
index 3a7ff4c8ee..82bad79a4d 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
@@ -105,7 +105,7 @@ public StorageAdminClient buildAdmin() {
         if (settings.enableServerSideRouting()) {
             return new SimpleStorageAdminClientImpl(settings);
         } else {
-            return new StorageAdminClientImpl(settings);
+            return new StorageAdminClientImpl(settings, 
ClientResources.create());
         }
     }
 
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 9d49556970..2b5bc4817c 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -44,57 +44,85 @@
  * The implementation of {@link StorageClient} client.
  */
 @Slf4j
-class StorageClientImpl extends AbstractAutoAsyncCloseable implements 
StorageClient {
+public class StorageClientImpl extends AbstractAutoAsyncCloseable implements 
StorageClient {
 
     private static final String COMPONENT_NAME = 
StorageClientImpl.class.getSimpleName();
 
-    private final String namespaceName;
+    private final String defaultNamespace;
     private final StorageClientSettings settings;
     private final ClientResources resources;
     private final OrderedScheduler scheduler;
 
     // clients
     private final StorageServerClientManager serverManager;
+    private final boolean ownServerManager;
+
+    StorageClientImpl(String namespaceName,
+                      StorageClientSettings settings,
+                      ClientResources resources) {
+        this(
+            namespaceName,
+            settings,
+            resources,
+            new StorageServerClientManagerImpl(settings, 
resources.scheduler()),
+            true);
+    }
 
     public StorageClientImpl(String namespaceName,
                              StorageClientSettings settings,
-                             ClientResources resources) {
-        this.namespaceName = namespaceName;
+                             ClientResources resources,
+                             StorageServerClientManager serverManager,
+                             boolean ownServerManager) {
+        this.defaultNamespace = namespaceName;
         this.settings = settings;
         this.resources = resources;
-        this.serverManager = new StorageServerClientManagerImpl(settings, 
resources.scheduler());
+        this.serverManager = serverManager;
+        this.ownServerManager = ownServerManager;
         this.scheduler = 
SharedResourceManager.shared().get(resources.scheduler());
-
     }
 
-    CompletableFuture<StreamProperties> getStreamProperties(String streamName) 
{
+    CompletableFuture<StreamProperties> getStreamProperties(String 
namespaceName,
+                                                            String streamName) 
{
         return 
this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
     }
 
     //
-    // Materialized Views
+    // Tables
     //
 
     @Override
-    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
streamName) {
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
tableName) {
+        return openPTable(defaultNamespace, tableName);
+    }
+
+    @Override
+    public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String 
namespaceName,
+                                                                  String 
tableName) {
         return ExceptionUtils.callAndHandleClosedAsync(
             COMPONENT_NAME,
             isClosed(),
-            (future) -> openStreamAsTableImpl(streamName, future));
+            (future) -> openTableImpl(namespaceName, tableName, future));
     }
 
     @Override
     public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
-        return openPTable(table)
+        return openTable(defaultNamespace, table);
+    }
+
+    @Override
+    public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String 
namespaceName,
+                                                                String table) {
+        return openPTable(namespaceName, table)
             .thenApply(pTable -> new ByteBufTableImpl(pTable));
     }
 
-    private void openStreamAsTableImpl(String streamName,
-                                       CompletableFuture<PTable<ByteBuf, 
ByteBuf>> future) {
+    private void openTableImpl(String namespaceName,
+                               String tableName,
+                               CompletableFuture<PTable<ByteBuf, ByteBuf>> 
future) {
         FutureUtils.proxyTo(
-            getStreamProperties(streamName).thenComposeAsync(props -> {
+            getStreamProperties(namespaceName, 
tableName).thenComposeAsync(props -> {
                 if (log.isInfoEnabled()) {
-                    log.info("Retrieved table properties for table {} : {}", 
streamName, props);
+                    log.info("Retrieved table properties for table {}/{} : 
{}", namespaceName, tableName, props);
                 }
                 if (StorageType.TABLE != 
props.getStreamConf().getStorageType()) {
                     return FutureUtils.exception(new ApiException(
@@ -102,7 +130,7 @@ private void openStreamAsTableImpl(String streamName,
                     );
                 }
                 return new PByteBufTableImpl(
-                    streamName,
+                    tableName,
                     props,
                     serverManager,
                     scheduler.chooseThread(props.getStreamId()),
@@ -120,7 +148,9 @@ private void openStreamAsTableImpl(String streamName,
     @Override
     protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
         scheduler.submit(() -> {
-            serverManager.close();
+            if (ownServerManager) {
+                serverManager.close();
+            }
             closeFuture.complete(null);
             SharedResourceManager.shared().release(resources.scheduler(), 
scheduler);
         });
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
index cbacbc396d..04072e1952 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
@@ -30,7 +30,9 @@
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.clients.SimpleClientBase;
+import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.clients.utils.GrpcUtils;
@@ -68,6 +70,16 @@ public SimpleStorageAdminClientImpl(StorageClientSettings 
settings,
             Optional.empty());
     }
 
+    @Override
+    public StorageClient asClient(String namespace) {
+        return new SimpleStorageClientImpl(
+            namespace,
+            settings,
+            schedulerResource,
+            managedChannel
+        );
+    }
+
     @Override
     public CompletableFuture<NamespaceProperties> createNamespace(String 
namespace, NamespaceConfiguration conf) {
         return retryUtils.execute(() ->
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
index 55cc63a0ef..ec3485aecc 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
@@ -18,6 +18,9 @@
 package org.apache.bookkeeper.clients.admin;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
 import org.apache.bookkeeper.common.util.AutoAsyncCloseable;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -27,21 +30,83 @@
 /**
  * A storage admin client.
  */
+@Public
+@Evolving
 public interface StorageAdminClient extends AutoAsyncCloseable {
 
+    /**
+     * Convert the storage admin client to a client.
+     *
+     * @return storage client
+     */
+    default StorageClient asClient() {
+        return asClient(null);
+    }
+
+    /**
+     * Convert the storage admin client to a client with default 
<tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @return storage client
+     */
+    StorageClient asClient(String namespace);
+
+    /**
+     * Create a <code>namespace</code> with the provided namespace 
configuration <tt>conf</tt>.
+     *
+     * @param namespace namespace
+     * @param conf namespace configuration
+     * @return a future represent the creation result
+     */
     CompletableFuture<NamespaceProperties> createNamespace(String namespace,
                                                            
NamespaceConfiguration conf);
 
+    /**
+     * Delete a <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @return a future represents the deletion result
+     */
     CompletableFuture<Boolean> deleteNamespace(String namespace);
 
+    /**
+     * Get the namespace properties of a given <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @return a future represents the get result
+     */
     CompletableFuture<NamespaceProperties> getNamespace(String namespace);
 
+    /**
+     * Create a stream <code>streamName</code> under namespace 
<code>namespace</code>
+     * with the provided stream configuration <tt>streamConfiguration</tt>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @param streamConfiguration stream configuration
+     * @return a future represents the creation result
+     */
     CompletableFuture<StreamProperties> createStream(String namespace,
                                                      String streamName,
                                                      StreamConfiguration 
streamConfiguration);
 
+    /**
+     * Delete a <code>stream</code> from the provided <tt>namespace</tt>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @return a future represents the deletion result
+     */
     CompletableFuture<Boolean> deleteStream(String namespace, String 
streamName);
 
+    /**
+     * Retrieve the stream properties of a given <code>stream</code> under
+     * the provided <code>namespace</code>.
+     *
+     * @param namespace namespace
+     * @param streamName stream name
+     * @return a future represents the get result
+     */
     CompletableFuture<StreamProperties> getStream(String namespace, String 
streamName);
 
 }
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
index 8c392a07aa..ea571bdf50 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
@@ -18,19 +18,18 @@
 
 package org.apache.bookkeeper.clients.admin;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientImpl;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import 
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -43,6 +42,8 @@
 public class StorageAdminClientImpl extends AbstractAutoAsyncCloseable 
implements StorageAdminClient {
 
     // clients
+    private final StorageClientSettings settings;
+    private final ClientResources resources;
     private final StorageServerClientManager clientManager;
     private final RootRangeClient rootRangeClient;
 
@@ -50,30 +51,31 @@
      * Create a stream admin client with provided {@code withSettings}.
      *
      * @param settings withSettings to create an admin client.
+     * @param resources resources used by this client
      */
-    public StorageAdminClientImpl(StorageClientSettings settings) {
+    public StorageAdminClientImpl(StorageClientSettings settings,
+                                  ClientResources resources) {
         this(
             settings,
-            ClientResources.create().scheduler());
-    }
-
-    /**
-     * Create a stream admin client with provided {@code withSettings} and 
{@code scheduler}.
-     *
-     * @param settings          withSettings to create an admin client.
-     * @param schedulerResource scheduler to execute.
-     */
-    public StorageAdminClientImpl(StorageClientSettings settings,
-                                  Resource<OrderedScheduler> 
schedulerResource) {
-        this(() -> new StorageServerClientManagerImpl(settings, 
schedulerResource));
+            resources,
+            () -> new StorageServerClientManagerImpl(settings, 
resources.scheduler()));
     }
 
-    @VisibleForTesting
-    StorageAdminClientImpl(Supplier<StorageServerClientManager> factory) {
+    StorageAdminClientImpl(StorageClientSettings settings,
+                           ClientResources resources,
+                           Supplier<StorageServerClientManager> factory) {
+        this.settings = settings;
+        this.resources = resources;
         this.clientManager = factory.get();
         this.rootRangeClient = this.clientManager.getRootRangeClient();
     }
 
+    @Override
+    public StorageClient asClient(String namespace) {
+        return new StorageClientImpl(
+            namespace, settings, resources, clientManager, false);
+    }
+
     @Override
     public CompletableFuture<NamespaceProperties> createNamespace(String 
namespace,
                                                                   
NamespaceConfiguration colConf) {
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
index 6ae9035a55..9f0e6dfe8d 100644
--- 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
@@ -23,6 +23,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -90,7 +91,7 @@ public void testOpenPTable() throws Exception {
                 .setStorageType(StorageType.TABLE)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -107,6 +108,51 @@ public void testOpenPTable() throws Exception {
         assertSame(tableImpl, returnedTableImpl);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOpenPTableDiffernetNamespace() throws Exception {
+        StreamProperties tableProps1 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table1")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+            .thenReturn(FutureUtils.value(tableProps1));
+
+        StreamProperties tableProps2 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table2")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+            .thenReturn(FutureUtils.value(tableProps2));
+
+        PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+        
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+        PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+        
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl1);
+
+        PTable<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+            client.openPTable("table1")
+        );
+        assertSame(tableImpl1, returnedTableImpl1);
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl2);
+
+        PTable<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+            client.openPTable("table2")
+        );
+        assertSame(tableImpl2, returnedTableImpl2);
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testOpenTable() throws Exception {
@@ -115,7 +161,7 @@ public void testOpenTable() throws Exception {
                 .setStorageType(StorageType.TABLE)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -134,6 +180,55 @@ public void testOpenTable() throws Exception {
         assertSame(tableImpl, Whitebox.getInternalState(bytesTableImpl, 
"underlying"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOpenTableWithDifferentNamespace() throws Exception {
+        StreamProperties tableProps1 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table1")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+            .thenReturn(FutureUtils.value(tableProps1));
+
+        StreamProperties tableProps2 = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamName("table2")
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+            .thenReturn(FutureUtils.value(tableProps2));
+
+        PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+        
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+        PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+        
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl1);
+
+        Table<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+            client.openTable("table1")
+        );
+        assertTrue(returnedTableImpl1 instanceof ByteBufTableImpl);
+        ByteBufTableImpl bytesTableImpl1 = (ByteBufTableImpl) 
returnedTableImpl1;
+        assertSame(tableImpl1, Whitebox.getInternalState(bytesTableImpl1, 
"underlying"));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl2);
+
+        Table<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+            client.openTable("table2")
+        );
+        assertTrue(returnedTableImpl2 instanceof ByteBufTableImpl);
+        ByteBufTableImpl bytesTableImpl2 = (ByteBufTableImpl) 
returnedTableImpl2;
+        assertSame(tableImpl2, Whitebox.getInternalState(bytesTableImpl2, 
"underlying"));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testOpenPTableIllegalOp() throws Exception {
@@ -142,7 +237,7 @@ public void testOpenPTableIllegalOp() throws Exception {
                 .setStorageType(StorageType.STREAM)
                 .build())
             .build();
-        when(client.getStreamProperties(anyString()))
+        when(client.getStreamProperties(anyString(), anyString()))
             .thenReturn(FutureUtils.value(streamProps));
 
         PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
index 0787abc33e..d9a791a5ef 100644
--- 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
@@ -25,8 +25,10 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
 import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.utils.ClientResources;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -66,7 +68,12 @@
     @Before
     public void setUp() {
         when(mockManager.getRootRangeClient()).thenReturn(mockRootRangeClient);
-        this.adminClient = new StorageAdminClientImpl(() -> mockManager);
+        this.adminClient = new StorageAdminClientImpl(
+            StorageClientSettings.newBuilder()
+                .serviceUri("bk://localhost:4181")
+                .build(),
+            ClientResources.create(),
+            () -> mockManager);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to