This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 26c77d2 [TABLE SERVICE] [CLIENT] storage client can open tables on a
different namespace
26c77d2 is described below
commit 26c77d24f848a42c8e339820dbea869233f9475e
Author: Sijie Guo <[email protected]>
AuthorDate: Mon Oct 8 18:02:35 2018 -0700
[TABLE SERVICE] [CLIENT] storage client can open tables on a different
namespace
Descriptions of the changes in this PR:
*Motivation*
In some cases, we are using same storage client for opening tables under
different namespaces.
It would be good that a client instance can do that.
*Changes*
- Add methods to `StorageClient` to open tables under different namespaces
- Add `asClient` to `StorageAdminClient` to convert admin client to storage
client
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>
This closes #1733 from sijie/improve_client_interface
---
.../org/apache/bookkeeper/api/StorageClient.java | 32 +++++++
.../bookkeeper/clients/SimpleClientBase.java | 28 ++++--
.../clients/SimpleStorageClientImpl.java | 46 ++++++++--
.../bookkeeper/clients/StorageClientBuilder.java | 2 +-
.../bookkeeper/clients/StorageClientImpl.java | 64 +++++++++----
.../admin/SimpleStorageAdminClientImpl.java | 12 +++
.../clients/admin/StorageAdminClient.java | 65 +++++++++++++
.../clients/admin/StorageAdminClientImpl.java | 38 ++++----
.../bookkeeper/clients/StorageClientImplTest.java | 101 ++++++++++++++++++++-
.../clients/admin/TestStorageAdminClientImpl.java | 9 +-
10 files changed, 341 insertions(+), 56 deletions(-)
diff --git
a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
index 969a8bb..419a828 100644
--- a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
+++ b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java
@@ -33,8 +33,40 @@ import org.apache.bookkeeper.common.util.AutoAsyncCloseable;
@Evolving
public interface StorageClient extends AutoAsyncCloseable {
+ /**
+ * Open a {@link PTable} <tt>table</tt> under <tt>namespace</tt>.
+ *
+ * @param namespace namespace
+ * @param table table name
+ * @return a future represents the open result
+ */
+ CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespace,
String table);
+
+ /**
+ * Open a {@link PTable} <tt>table</tt> under the default namespace of
this client.
+ * The default namespace is configured when creating {@link StorageClient}.
+ *
+ * @param table table name
+ * @return a future represents the open result
+ */
CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String table);
+ /**
+ * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+ *
+ * @param namespace namespace
+ * @param table table name
+ * @return a future represents the open result
+ */
+ CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespace,
String table);
+
+ /**
+ * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>.
+ * The default namespace is configured when creating {@link StorageClient}.
+ *
+ * @param table table name
+ * @return a future represents the open result
+ */
CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table);
}
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
index 3f6bc77..c213687 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java
@@ -38,9 +38,11 @@ import
org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
*/
public class SimpleClientBase extends AbstractAutoAsyncCloseable {
+ protected final StorageClientSettings settings;
protected final Resource<OrderedScheduler> schedulerResource;
protected final OrderedScheduler scheduler;
protected final ManagedChannel managedChannel;
+ protected final boolean ownChannel;
protected final Channel channel;
protected final RetryUtils retryUtils;
@@ -50,11 +52,23 @@ public class SimpleClientBase extends
AbstractAutoAsyncCloseable {
protected SimpleClientBase(StorageClientSettings settings,
Resource<OrderedScheduler> schedulerResource) {
- this.managedChannel =
GrpcChannels.createChannelBuilder(settings.serviceUri(), settings).build();
+ this(
+ settings,
+ schedulerResource,
+ GrpcChannels.createChannelBuilder(settings.serviceUri(),
settings).build(),
+ true);
+ }
+
+ protected SimpleClientBase(StorageClientSettings settings,
+ Resource<OrderedScheduler> schedulerResource,
+ ManagedChannel managedChannel,
+ boolean ownChannel) {
+ this.settings = settings;
+ this.managedChannel = managedChannel;
+ this.ownChannel = ownChannel;
this.channel = ClientInterceptors.intercept(
managedChannel,
new StorageContainerClientInterceptor(0L));
-
this.schedulerResource = schedulerResource;
this.scheduler = SharedResourceManager.shared().get(schedulerResource);
this.retryUtils = RetryUtils.create(settings.backoffPolicy(),
scheduler);
@@ -62,10 +76,10 @@ public class SimpleClientBase extends
AbstractAutoAsyncCloseable {
@Override
protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
- managedChannel.shutdown();
- scheduler.submit(() -> {
- SharedResourceManager.shared().release(schedulerResource,
scheduler);
- closeFuture.complete(null);
- });
+ if (ownChannel) {
+ managedChannel.shutdown();
+ }
+ SharedResourceManager.shared().release(schedulerResource, scheduler);
+ closeFuture.complete(null);
}
}
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
index 4817dcc..79ffb0e 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.bookkeeper.common.util.ListenableFutures.fromListenable
import static
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStreamRequest;
import io.grpc.CallOptions;
+import io.grpc.ManagedChannel;
import io.netty.buffer.ByteBuf;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@ import
org.apache.bookkeeper.clients.impl.kv.PByteBufSimpleTableImpl;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.ExceptionUtils;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
@@ -48,17 +51,28 @@ import
org.apache.bookkeeper.stream.proto.storage.StatusCode;
* The implementation of {@link StorageClient} client.
*/
@Slf4j
-class SimpleStorageClientImpl extends SimpleClientBase implements
StorageClient {
+public class SimpleStorageClientImpl extends SimpleClientBase implements
StorageClient {
private static final String COMPONENT_NAME =
SimpleStorageClientImpl.class.getSimpleName();
- private final String namespaceName;
+ private final String defaultNamespace;
private final RootRangeServiceFutureStub rootRangeService;
public SimpleStorageClientImpl(String namespaceName,
StorageClientSettings settings) {
super(settings);
- this.namespaceName = namespaceName;
+ this.defaultNamespace = namespaceName;
+ this.rootRangeService = GrpcUtils.configureGrpcStub(
+ RootRangeServiceGrpc.newFutureStub(channel),
+ Optional.empty());
+ }
+
+ public SimpleStorageClientImpl(String namespaceName,
+ StorageClientSettings settings,
+ Resource<OrderedScheduler>
schedulerResource,
+ ManagedChannel channel) {
+ super(settings, schedulerResource, channel, false);
+ this.defaultNamespace = namespaceName;
this.rootRangeService = GrpcUtils.configureGrpcStub(
RootRangeServiceGrpc.newFutureStub(channel),
Optional.empty());
@@ -69,28 +83,42 @@ class SimpleStorageClientImpl extends SimpleClientBase
implements StorageClient
//
@Override
- public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
streamName) {
+ public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
tableName) {
+ return openPTable(defaultNamespace, tableName);
+ }
+
+ @Override
+ public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
namespaceName,
+ String
tableName) {
return ExceptionUtils.callAndHandleClosedAsync(
COMPONENT_NAME,
isClosed(),
- (future) -> openStreamAsTableImpl(streamName, future));
+ (future) -> openTableImpl(namespaceName, tableName, future));
}
@Override
public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
- return openPTable(table)
+ return openTable(defaultNamespace, table);
+ }
+
+ @Override
+ public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String
namespaceName,
+ String table) {
+ return openPTable(namespaceName, table)
.thenApply(pTable -> new ByteBufTableImpl(pTable));
}
- private void openStreamAsTableImpl(String streamName,
- CompletableFuture<PTable<ByteBuf,
ByteBuf>> future) {
+ private void openTableImpl(String namespaceName,
+ String streamName,
+ CompletableFuture<PTable<ByteBuf, ByteBuf>>
future) {
CompletableFuture<StreamProperties> getStreamFuture =
retryUtils.execute(() ->
fromListenableFuture(rootRangeService.getStream(
createGetStreamRequest(namespaceName, streamName)))
).thenCompose(resp -> {
if (StatusCode.SUCCESS == resp.getCode()) {
StreamProperties streamProps = resp.getStreamProps();
- log.info("Retrieved table properties for table {} : {}",
streamName, streamProps);
+ log.info("Retrieved table properties for table {}/{} : {}",
+ namespaceName, streamName, streamProps);
if (StorageType.TABLE !=
streamProps.getStreamConf().getStorageType()) {
return FutureUtils.exception(new ApiException(
"Can't open a non-table storage entity : " +
streamProps.getStreamConf().getStorageType()));
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
index 3a7ff4c..82bad79 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java
@@ -105,7 +105,7 @@ public class StorageClientBuilder implements
Supplier<StorageClient> {
if (settings.enableServerSideRouting()) {
return new SimpleStorageAdminClientImpl(settings);
} else {
- return new StorageAdminClientImpl(settings);
+ return new StorageAdminClientImpl(settings,
ClientResources.create());
}
}
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 9d49556..2b5bc48 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -44,57 +44,85 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
* The implementation of {@link StorageClient} client.
*/
@Slf4j
-class StorageClientImpl extends AbstractAutoAsyncCloseable implements
StorageClient {
+public class StorageClientImpl extends AbstractAutoAsyncCloseable implements
StorageClient {
private static final String COMPONENT_NAME =
StorageClientImpl.class.getSimpleName();
- private final String namespaceName;
+ private final String defaultNamespace;
private final StorageClientSettings settings;
private final ClientResources resources;
private final OrderedScheduler scheduler;
// clients
private final StorageServerClientManager serverManager;
+ private final boolean ownServerManager;
+
+ StorageClientImpl(String namespaceName,
+ StorageClientSettings settings,
+ ClientResources resources) {
+ this(
+ namespaceName,
+ settings,
+ resources,
+ new StorageServerClientManagerImpl(settings,
resources.scheduler()),
+ true);
+ }
public StorageClientImpl(String namespaceName,
StorageClientSettings settings,
- ClientResources resources) {
- this.namespaceName = namespaceName;
+ ClientResources resources,
+ StorageServerClientManager serverManager,
+ boolean ownServerManager) {
+ this.defaultNamespace = namespaceName;
this.settings = settings;
this.resources = resources;
- this.serverManager = new StorageServerClientManagerImpl(settings,
resources.scheduler());
+ this.serverManager = serverManager;
+ this.ownServerManager = ownServerManager;
this.scheduler =
SharedResourceManager.shared().get(resources.scheduler());
-
}
- CompletableFuture<StreamProperties> getStreamProperties(String streamName)
{
+ CompletableFuture<StreamProperties> getStreamProperties(String
namespaceName,
+ String streamName)
{
return
this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
}
//
- // Materialized Views
+ // Tables
//
@Override
- public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
streamName) {
+ public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
tableName) {
+ return openPTable(defaultNamespace, tableName);
+ }
+
+ @Override
+ public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String
namespaceName,
+ String
tableName) {
return ExceptionUtils.callAndHandleClosedAsync(
COMPONENT_NAME,
isClosed(),
- (future) -> openStreamAsTableImpl(streamName, future));
+ (future) -> openTableImpl(namespaceName, tableName, future));
}
@Override
public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) {
- return openPTable(table)
+ return openTable(defaultNamespace, table);
+ }
+
+ @Override
+ public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String
namespaceName,
+ String table) {
+ return openPTable(namespaceName, table)
.thenApply(pTable -> new ByteBufTableImpl(pTable));
}
- private void openStreamAsTableImpl(String streamName,
- CompletableFuture<PTable<ByteBuf,
ByteBuf>> future) {
+ private void openTableImpl(String namespaceName,
+ String tableName,
+ CompletableFuture<PTable<ByteBuf, ByteBuf>>
future) {
FutureUtils.proxyTo(
- getStreamProperties(streamName).thenComposeAsync(props -> {
+ getStreamProperties(namespaceName,
tableName).thenComposeAsync(props -> {
if (log.isInfoEnabled()) {
- log.info("Retrieved table properties for table {} : {}",
streamName, props);
+ log.info("Retrieved table properties for table {}/{} :
{}", namespaceName, tableName, props);
}
if (StorageType.TABLE !=
props.getStreamConf().getStorageType()) {
return FutureUtils.exception(new ApiException(
@@ -102,7 +130,7 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable
implements StorageCli
);
}
return new PByteBufTableImpl(
- streamName,
+ tableName,
props,
serverManager,
scheduler.chooseThread(props.getStreamId()),
@@ -120,7 +148,9 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable
implements StorageCli
@Override
protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) {
scheduler.submit(() -> {
- serverManager.close();
+ if (ownServerManager) {
+ serverManager.close();
+ }
closeFuture.complete(null);
SharedResourceManager.shared().release(resources.scheduler(),
scheduler);
});
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
index cbacbc3..04072e1 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java
@@ -30,7 +30,9 @@ import static
org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStr
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.clients.SimpleClientBase;
+import org.apache.bookkeeper.clients.SimpleStorageClientImpl;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
@@ -69,6 +71,16 @@ public class SimpleStorageAdminClientImpl extends
SimpleClientBase implements St
}
@Override
+ public StorageClient asClient(String namespace) {
+ return new SimpleStorageClientImpl(
+ namespace,
+ settings,
+ schedulerResource,
+ managedChannel
+ );
+ }
+
+ @Override
public CompletableFuture<NamespaceProperties> createNamespace(String
namespace, NamespaceConfiguration conf) {
return retryUtils.execute(() ->
fromListenableFuture(rootRangeService.createNamespace(createCreateNamespaceRequest(namespace,
conf)))
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
index 55cc63a..ec3485a 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java
@@ -18,6 +18,9 @@
package org.apache.bookkeeper.clients.admin;
import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.AutoAsyncCloseable;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -27,21 +30,83 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
/**
* A storage admin client.
*/
+@Public
+@Evolving
public interface StorageAdminClient extends AutoAsyncCloseable {
+ /**
+ * Convert the storage admin client to a client.
+ *
+ * @return storage client
+ */
+ default StorageClient asClient() {
+ return asClient(null);
+ }
+
+ /**
+ * Convert the storage admin client to a client with default
<tt>namespace</tt>.
+ *
+ * @param namespace namespace
+ * @return storage client
+ */
+ StorageClient asClient(String namespace);
+
+ /**
+ * Create a <code>namespace</code> with the provided namespace
configuration <tt>conf</tt>.
+ *
+ * @param namespace namespace
+ * @param conf namespace configuration
+ * @return a future represent the creation result
+ */
CompletableFuture<NamespaceProperties> createNamespace(String namespace,
NamespaceConfiguration conf);
+ /**
+ * Delete a <code>namespace</code>.
+ *
+ * @param namespace namespace
+ * @return a future represents the deletion result
+ */
CompletableFuture<Boolean> deleteNamespace(String namespace);
+ /**
+ * Get the namespace properties of a given <code>namespace</code>.
+ *
+ * @param namespace namespace
+ * @return a future represents the get result
+ */
CompletableFuture<NamespaceProperties> getNamespace(String namespace);
+ /**
+ * Create a stream <code>streamName</code> under namespace
<code>namespace</code>
+ * with the provided stream configuration <tt>streamConfiguration</tt>.
+ *
+ * @param namespace namespace
+ * @param streamName stream name
+ * @param streamConfiguration stream configuration
+ * @return a future represents the creation result
+ */
CompletableFuture<StreamProperties> createStream(String namespace,
String streamName,
StreamConfiguration
streamConfiguration);
+ /**
+ * Delete a <code>stream</code> from the provided <tt>namespace</tt>.
+ *
+ * @param namespace namespace
+ * @param streamName stream name
+ * @return a future represents the deletion result
+ */
CompletableFuture<Boolean> deleteStream(String namespace, String
streamName);
+ /**
+ * Retrieve the stream properties of a given <code>stream</code> under
+ * the provided <code>namespace</code>.
+ *
+ * @param namespace namespace
+ * @param streamName stream name
+ * @return a future represents the get result
+ */
CompletableFuture<StreamProperties> getStream(String namespace, String
streamName);
}
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
index 8c392a0..ea571bd 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java
@@ -18,19 +18,18 @@
package org.apache.bookkeeper.clients.admin;
-import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.clients.StorageClientImpl;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
@@ -43,6 +42,8 @@ import org.apache.bookkeeper.stream.proto.StreamProperties;
public class StorageAdminClientImpl extends AbstractAutoAsyncCloseable
implements StorageAdminClient {
// clients
+ private final StorageClientSettings settings;
+ private final ClientResources resources;
private final StorageServerClientManager clientManager;
private final RootRangeClient rootRangeClient;
@@ -50,31 +51,32 @@ public class StorageAdminClientImpl extends
AbstractAutoAsyncCloseable implement
* Create a stream admin client with provided {@code withSettings}.
*
* @param settings withSettings to create an admin client.
+ * @param resources resources used by this client
*/
- public StorageAdminClientImpl(StorageClientSettings settings) {
+ public StorageAdminClientImpl(StorageClientSettings settings,
+ ClientResources resources) {
this(
settings,
- ClientResources.create().scheduler());
- }
-
- /**
- * Create a stream admin client with provided {@code withSettings} and
{@code scheduler}.
- *
- * @param settings withSettings to create an admin client.
- * @param schedulerResource scheduler to execute.
- */
- public StorageAdminClientImpl(StorageClientSettings settings,
- Resource<OrderedScheduler>
schedulerResource) {
- this(() -> new StorageServerClientManagerImpl(settings,
schedulerResource));
+ resources,
+ () -> new StorageServerClientManagerImpl(settings,
resources.scheduler()));
}
- @VisibleForTesting
- StorageAdminClientImpl(Supplier<StorageServerClientManager> factory) {
+ StorageAdminClientImpl(StorageClientSettings settings,
+ ClientResources resources,
+ Supplier<StorageServerClientManager> factory) {
+ this.settings = settings;
+ this.resources = resources;
this.clientManager = factory.get();
this.rootRangeClient = this.clientManager.getRootRangeClient();
}
@Override
+ public StorageClient asClient(String namespace) {
+ return new StorageClientImpl(
+ namespace, settings, resources, clientManager, false);
+ }
+
+ @Override
public CompletableFuture<NamespaceProperties> createNamespace(String
namespace,
NamespaceConfiguration colConf) {
return rootRangeClient.createNamespace(namespace, colConf);
diff --git
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
index 6ae9035..9f0e6df 100644
---
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
+++
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -90,7 +91,7 @@ public class StorageClientImplTest extends GrpcClientTestBase
{
.setStorageType(StorageType.TABLE)
.build())
.build();
- when(client.getStreamProperties(anyString()))
+ when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));
PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -109,13 +110,58 @@ public class StorageClientImplTest extends
GrpcClientTestBase {
@SuppressWarnings("unchecked")
@Test
+ public void testOpenPTableDiffernetNamespace() throws Exception {
+ StreamProperties tableProps1 =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamName("table1")
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+ .thenReturn(FutureUtils.value(tableProps1));
+
+ StreamProperties tableProps2 =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamName("table2")
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+ .thenReturn(FutureUtils.value(tableProps2));
+
+ PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+ PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl1);
+
+ PTable<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+ client.openPTable("table1")
+ );
+ assertSame(tableImpl1, returnedTableImpl1);
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl2);
+
+ PTable<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+ client.openPTable("table2")
+ );
+ assertSame(tableImpl2, returnedTableImpl2);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testOpenTable() throws Exception {
StreamProperties streamProps =
StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.TABLE)
.build())
.build();
- when(client.getStreamProperties(anyString()))
+ when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));
PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
@@ -136,13 +182,62 @@ public class StorageClientImplTest extends
GrpcClientTestBase {
@SuppressWarnings("unchecked")
@Test
+ public void testOpenTableWithDifferentNamespace() throws Exception {
+ StreamProperties tableProps1 =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamName("table1")
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(eq(NAMESPACE), eq("table1")))
+ .thenReturn(FutureUtils.value(tableProps1));
+
+ StreamProperties tableProps2 =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamName("table2")
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(eq(NAMESPACE), eq("table2")))
+ .thenReturn(FutureUtils.value(tableProps2));
+
+ PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class);
+
when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1));
+ PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class);
+
when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl1);
+
+ Table<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result(
+ client.openTable("table1")
+ );
+ assertTrue(returnedTableImpl1 instanceof ByteBufTableImpl);
+ ByteBufTableImpl bytesTableImpl1 = (ByteBufTableImpl)
returnedTableImpl1;
+ assertSame(tableImpl1, Whitebox.getInternalState(bytesTableImpl1,
"underlying"));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl2);
+
+ Table<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result(
+ client.openTable("table2")
+ );
+ assertTrue(returnedTableImpl2 instanceof ByteBufTableImpl);
+ ByteBufTableImpl bytesTableImpl2 = (ByteBufTableImpl)
returnedTableImpl2;
+ assertSame(tableImpl2, Whitebox.getInternalState(bytesTableImpl2,
"underlying"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testOpenPTableIllegalOp() throws Exception {
StreamProperties streamProps =
StreamProperties.newBuilder(STREAM_PROPERTIES)
.setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setStorageType(StorageType.STREAM)
.build())
.build();
- when(client.getStreamProperties(anyString()))
+ when(client.getStreamProperties(anyString(), anyString()))
.thenReturn(FutureUtils.value(streamProps));
PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
diff --git
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
index 0787abc..d9a791a 100644
---
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
+++
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java
@@ -25,8 +25,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
import
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.utils.ClientResources;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
@@ -66,7 +68,12 @@ public class TestStorageAdminClientImpl {
@Before
public void setUp() {
when(mockManager.getRootRangeClient()).thenReturn(mockRootRangeClient);
- this.adminClient = new StorageAdminClientImpl(() -> mockManager);
+ this.adminClient = new StorageAdminClientImpl(
+ StorageClientSettings.newBuilder()
+ .serviceUri("bk://localhost:4181")
+ .build(),
+ ClientResources.create(),
+ () -> mockManager);
}
@Test