This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 26c77d2 [TABLE SERVICE] [CLIENT] storage client can open tables on a different namespace 26c77d2 is described below commit 26c77d24f848a42c8e339820dbea869233f9475e Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Oct 8 18:02:35 2018 -0700 [TABLE SERVICE] [CLIENT] storage client can open tables on a different namespace Descriptions of the changes in this PR: *Motivation* In some cases, we are using same storage client for opening tables under different namespaces. It would be good that a client instance can do that. *Changes* - Add methods to `StorageClient` to open tables under different namespaces - Add `asClient` to `StorageAdminClient` to convert admin client to storage client Author: Sijie Guo <si...@apache.org> Reviewers: Enrico Olivelli <eolive...@gmail.com>, Jia Zhai <None> This closes #1733 from sijie/improve_client_interface --- .../org/apache/bookkeeper/api/StorageClient.java | 32 +++++++ .../bookkeeper/clients/SimpleClientBase.java | 28 ++++-- .../clients/SimpleStorageClientImpl.java | 46 ++++++++-- .../bookkeeper/clients/StorageClientBuilder.java | 2 +- .../bookkeeper/clients/StorageClientImpl.java | 64 +++++++++---- .../admin/SimpleStorageAdminClientImpl.java | 12 +++ .../clients/admin/StorageAdminClient.java | 65 +++++++++++++ .../clients/admin/StorageAdminClientImpl.java | 38 ++++---- .../bookkeeper/clients/StorageClientImplTest.java | 101 ++++++++++++++++++++- .../clients/admin/TestStorageAdminClientImpl.java | 9 +- 10 files changed, 341 insertions(+), 56 deletions(-) diff --git a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java index 969a8bb..419a828 100644 --- a/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java +++ b/stream/api/src/main/java/org/apache/bookkeeper/api/StorageClient.java @@ -33,8 +33,40 @@ import org.apache.bookkeeper.common.util.AutoAsyncCloseable; @Evolving public interface StorageClient extends AutoAsyncCloseable { + /** + * Open a {@link PTable} <tt>table</tt> under <tt>namespace</tt>. + * + * @param namespace namespace + * @param table table name + * @return a future represents the open result + */ + CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespace, String table); + + /** + * Open a {@link PTable} <tt>table</tt> under the default namespace of this client. + * The default namespace is configured when creating {@link StorageClient}. + * + * @param table table name + * @return a future represents the open result + */ CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String table); + /** + * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>. + * + * @param namespace namespace + * @param table table name + * @return a future represents the open result + */ + CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespace, String table); + + /** + * Open a {@link Table} <tt>table</tt> under <tt>namespace</tt>. + * The default namespace is configured when creating {@link StorageClient}. + * + * @param table table name + * @return a future represents the open result + */ CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table); } diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java index 3f6bc77..c213687 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleClientBase.java @@ -38,9 +38,11 @@ import org.apache.bookkeeper.common.util.SharedResourceManager.Resource; */ public class SimpleClientBase extends AbstractAutoAsyncCloseable { + protected final StorageClientSettings settings; protected final Resource<OrderedScheduler> schedulerResource; protected final OrderedScheduler scheduler; protected final ManagedChannel managedChannel; + protected final boolean ownChannel; protected final Channel channel; protected final RetryUtils retryUtils; @@ -50,11 +52,23 @@ public class SimpleClientBase extends AbstractAutoAsyncCloseable { protected SimpleClientBase(StorageClientSettings settings, Resource<OrderedScheduler> schedulerResource) { - this.managedChannel = GrpcChannels.createChannelBuilder(settings.serviceUri(), settings).build(); + this( + settings, + schedulerResource, + GrpcChannels.createChannelBuilder(settings.serviceUri(), settings).build(), + true); + } + + protected SimpleClientBase(StorageClientSettings settings, + Resource<OrderedScheduler> schedulerResource, + ManagedChannel managedChannel, + boolean ownChannel) { + this.settings = settings; + this.managedChannel = managedChannel; + this.ownChannel = ownChannel; this.channel = ClientInterceptors.intercept( managedChannel, new StorageContainerClientInterceptor(0L)); - this.schedulerResource = schedulerResource; this.scheduler = SharedResourceManager.shared().get(schedulerResource); this.retryUtils = RetryUtils.create(settings.backoffPolicy(), scheduler); @@ -62,10 +76,10 @@ public class SimpleClientBase extends AbstractAutoAsyncCloseable { @Override protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) { - managedChannel.shutdown(); - scheduler.submit(() -> { - SharedResourceManager.shared().release(schedulerResource, scheduler); - closeFuture.complete(null); - }); + if (ownChannel) { + managedChannel.shutdown(); + } + SharedResourceManager.shared().release(schedulerResource, scheduler); + closeFuture.complete(null); } } diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java index 4817dcc..79ffb0e 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/SimpleStorageClientImpl.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenable import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStreamRequest; import io.grpc.CallOptions; +import io.grpc.ManagedChannel; import io.netty.buffer.ByteBuf; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -38,6 +39,8 @@ import org.apache.bookkeeper.clients.impl.kv.PByteBufSimpleTableImpl; import org.apache.bookkeeper.clients.utils.GrpcUtils; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.common.util.ExceptionUtils; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.SharedResourceManager.Resource; import org.apache.bookkeeper.stream.proto.StorageType; import org.apache.bookkeeper.stream.proto.StreamProperties; import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc; @@ -48,17 +51,28 @@ import org.apache.bookkeeper.stream.proto.storage.StatusCode; * The implementation of {@link StorageClient} client. */ @Slf4j -class SimpleStorageClientImpl extends SimpleClientBase implements StorageClient { +public class SimpleStorageClientImpl extends SimpleClientBase implements StorageClient { private static final String COMPONENT_NAME = SimpleStorageClientImpl.class.getSimpleName(); - private final String namespaceName; + private final String defaultNamespace; private final RootRangeServiceFutureStub rootRangeService; public SimpleStorageClientImpl(String namespaceName, StorageClientSettings settings) { super(settings); - this.namespaceName = namespaceName; + this.defaultNamespace = namespaceName; + this.rootRangeService = GrpcUtils.configureGrpcStub( + RootRangeServiceGrpc.newFutureStub(channel), + Optional.empty()); + } + + public SimpleStorageClientImpl(String namespaceName, + StorageClientSettings settings, + Resource<OrderedScheduler> schedulerResource, + ManagedChannel channel) { + super(settings, schedulerResource, channel, false); + this.defaultNamespace = namespaceName; this.rootRangeService = GrpcUtils.configureGrpcStub( RootRangeServiceGrpc.newFutureStub(channel), Optional.empty()); @@ -69,28 +83,42 @@ class SimpleStorageClientImpl extends SimpleClientBase implements StorageClient // @Override - public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String streamName) { + public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String tableName) { + return openPTable(defaultNamespace, tableName); + } + + @Override + public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespaceName, + String tableName) { return ExceptionUtils.callAndHandleClosedAsync( COMPONENT_NAME, isClosed(), - (future) -> openStreamAsTableImpl(streamName, future)); + (future) -> openTableImpl(namespaceName, tableName, future)); } @Override public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) { - return openPTable(table) + return openTable(defaultNamespace, table); + } + + @Override + public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespaceName, + String table) { + return openPTable(namespaceName, table) .thenApply(pTable -> new ByteBufTableImpl(pTable)); } - private void openStreamAsTableImpl(String streamName, - CompletableFuture<PTable<ByteBuf, ByteBuf>> future) { + private void openTableImpl(String namespaceName, + String streamName, + CompletableFuture<PTable<ByteBuf, ByteBuf>> future) { CompletableFuture<StreamProperties> getStreamFuture = retryUtils.execute(() -> fromListenableFuture(rootRangeService.getStream( createGetStreamRequest(namespaceName, streamName))) ).thenCompose(resp -> { if (StatusCode.SUCCESS == resp.getCode()) { StreamProperties streamProps = resp.getStreamProps(); - log.info("Retrieved table properties for table {} : {}", streamName, streamProps); + log.info("Retrieved table properties for table {}/{} : {}", + namespaceName, streamName, streamProps); if (StorageType.TABLE != streamProps.getStreamConf().getStorageType()) { return FutureUtils.exception(new ApiException( "Can't open a non-table storage entity : " + streamProps.getStreamConf().getStorageType())); diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java index 3a7ff4c..82bad79 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientBuilder.java @@ -105,7 +105,7 @@ public class StorageClientBuilder implements Supplier<StorageClient> { if (settings.enableServerSideRouting()) { return new SimpleStorageAdminClientImpl(settings); } else { - return new StorageAdminClientImpl(settings); + return new StorageAdminClientImpl(settings, ClientResources.create()); } } diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java index 9d49556..2b5bc48 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java @@ -44,57 +44,85 @@ import org.apache.bookkeeper.stream.proto.StreamProperties; * The implementation of {@link StorageClient} client. */ @Slf4j -class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageClient { +public class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageClient { private static final String COMPONENT_NAME = StorageClientImpl.class.getSimpleName(); - private final String namespaceName; + private final String defaultNamespace; private final StorageClientSettings settings; private final ClientResources resources; private final OrderedScheduler scheduler; // clients private final StorageServerClientManager serverManager; + private final boolean ownServerManager; + + StorageClientImpl(String namespaceName, + StorageClientSettings settings, + ClientResources resources) { + this( + namespaceName, + settings, + resources, + new StorageServerClientManagerImpl(settings, resources.scheduler()), + true); + } public StorageClientImpl(String namespaceName, StorageClientSettings settings, - ClientResources resources) { - this.namespaceName = namespaceName; + ClientResources resources, + StorageServerClientManager serverManager, + boolean ownServerManager) { + this.defaultNamespace = namespaceName; this.settings = settings; this.resources = resources; - this.serverManager = new StorageServerClientManagerImpl(settings, resources.scheduler()); + this.serverManager = serverManager; + this.ownServerManager = ownServerManager; this.scheduler = SharedResourceManager.shared().get(resources.scheduler()); - } - CompletableFuture<StreamProperties> getStreamProperties(String streamName) { + CompletableFuture<StreamProperties> getStreamProperties(String namespaceName, + String streamName) { return this.serverManager.getRootRangeClient().getStream(namespaceName, streamName); } // - // Materialized Views + // Tables // @Override - public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String streamName) { + public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String tableName) { + return openPTable(defaultNamespace, tableName); + } + + @Override + public CompletableFuture<PTable<ByteBuf, ByteBuf>> openPTable(String namespaceName, + String tableName) { return ExceptionUtils.callAndHandleClosedAsync( COMPONENT_NAME, isClosed(), - (future) -> openStreamAsTableImpl(streamName, future)); + (future) -> openTableImpl(namespaceName, tableName, future)); } @Override public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String table) { - return openPTable(table) + return openTable(defaultNamespace, table); + } + + @Override + public CompletableFuture<Table<ByteBuf, ByteBuf>> openTable(String namespaceName, + String table) { + return openPTable(namespaceName, table) .thenApply(pTable -> new ByteBufTableImpl(pTable)); } - private void openStreamAsTableImpl(String streamName, - CompletableFuture<PTable<ByteBuf, ByteBuf>> future) { + private void openTableImpl(String namespaceName, + String tableName, + CompletableFuture<PTable<ByteBuf, ByteBuf>> future) { FutureUtils.proxyTo( - getStreamProperties(streamName).thenComposeAsync(props -> { + getStreamProperties(namespaceName, tableName).thenComposeAsync(props -> { if (log.isInfoEnabled()) { - log.info("Retrieved table properties for table {} : {}", streamName, props); + log.info("Retrieved table properties for table {}/{} : {}", namespaceName, tableName, props); } if (StorageType.TABLE != props.getStreamConf().getStorageType()) { return FutureUtils.exception(new ApiException( @@ -102,7 +130,7 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli ); } return new PByteBufTableImpl( - streamName, + tableName, props, serverManager, scheduler.chooseThread(props.getStreamId()), @@ -120,7 +148,9 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli @Override protected void closeAsyncOnce(CompletableFuture<Void> closeFuture) { scheduler.submit(() -> { - serverManager.close(); + if (ownServerManager) { + serverManager.close(); + } closeFuture.complete(null); SharedResourceManager.shared().release(resources.scheduler(), scheduler); }); diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java index cbacbc3..04072e1 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/SimpleStorageAdminClientImpl.java @@ -30,7 +30,9 @@ import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createGetStr import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.clients.SimpleClientBase; +import org.apache.bookkeeper.clients.SimpleStorageClientImpl; import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.utils.ClientResources; import org.apache.bookkeeper.clients.utils.GrpcUtils; @@ -69,6 +71,16 @@ public class SimpleStorageAdminClientImpl extends SimpleClientBase implements St } @Override + public StorageClient asClient(String namespace) { + return new SimpleStorageClientImpl( + namespace, + settings, + schedulerResource, + managedChannel + ); + } + + @Override public CompletableFuture<NamespaceProperties> createNamespace(String namespace, NamespaceConfiguration conf) { return retryUtils.execute(() -> fromListenableFuture(rootRangeService.createNamespace(createCreateNamespaceRequest(namespace, conf))) diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java index 55cc63a..ec3485a 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClient.java @@ -18,6 +18,9 @@ package org.apache.bookkeeper.clients.admin; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.api.StorageClient; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.bookkeeper.common.util.AutoAsyncCloseable; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceProperties; @@ -27,21 +30,83 @@ import org.apache.bookkeeper.stream.proto.StreamProperties; /** * A storage admin client. */ +@Public +@Evolving public interface StorageAdminClient extends AutoAsyncCloseable { + /** + * Convert the storage admin client to a client. + * + * @return storage client + */ + default StorageClient asClient() { + return asClient(null); + } + + /** + * Convert the storage admin client to a client with default <tt>namespace</tt>. + * + * @param namespace namespace + * @return storage client + */ + StorageClient asClient(String namespace); + + /** + * Create a <code>namespace</code> with the provided namespace configuration <tt>conf</tt>. + * + * @param namespace namespace + * @param conf namespace configuration + * @return a future represent the creation result + */ CompletableFuture<NamespaceProperties> createNamespace(String namespace, NamespaceConfiguration conf); + /** + * Delete a <code>namespace</code>. + * + * @param namespace namespace + * @return a future represents the deletion result + */ CompletableFuture<Boolean> deleteNamespace(String namespace); + /** + * Get the namespace properties of a given <code>namespace</code>. + * + * @param namespace namespace + * @return a future represents the get result + */ CompletableFuture<NamespaceProperties> getNamespace(String namespace); + /** + * Create a stream <code>streamName</code> under namespace <code>namespace</code> + * with the provided stream configuration <tt>streamConfiguration</tt>. + * + * @param namespace namespace + * @param streamName stream name + * @param streamConfiguration stream configuration + * @return a future represents the creation result + */ CompletableFuture<StreamProperties> createStream(String namespace, String streamName, StreamConfiguration streamConfiguration); + /** + * Delete a <code>stream</code> from the provided <tt>namespace</tt>. + * + * @param namespace namespace + * @param streamName stream name + * @return a future represents the deletion result + */ CompletableFuture<Boolean> deleteStream(String namespace, String streamName); + /** + * Retrieve the stream properties of a given <code>stream</code> under + * the provided <code>namespace</code>. + * + * @param namespace namespace + * @param streamName stream name + * @return a future represents the get result + */ CompletableFuture<StreamProperties> getStream(String namespace, String streamName); } diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java index 8c392a0..ea571bd 100644 --- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java +++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/admin/StorageAdminClientImpl.java @@ -18,19 +18,18 @@ package org.apache.bookkeeper.clients.admin; -import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.api.StorageClient; +import org.apache.bookkeeper.clients.StorageClientImpl; import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl; import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient; import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.clients.utils.ClientResources; import org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable; -import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.common.util.SharedResourceManager.Resource; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceProperties; import org.apache.bookkeeper.stream.proto.StreamConfiguration; @@ -43,6 +42,8 @@ import org.apache.bookkeeper.stream.proto.StreamProperties; public class StorageAdminClientImpl extends AbstractAutoAsyncCloseable implements StorageAdminClient { // clients + private final StorageClientSettings settings; + private final ClientResources resources; private final StorageServerClientManager clientManager; private final RootRangeClient rootRangeClient; @@ -50,31 +51,32 @@ public class StorageAdminClientImpl extends AbstractAutoAsyncCloseable implement * Create a stream admin client with provided {@code withSettings}. * * @param settings withSettings to create an admin client. + * @param resources resources used by this client */ - public StorageAdminClientImpl(StorageClientSettings settings) { + public StorageAdminClientImpl(StorageClientSettings settings, + ClientResources resources) { this( settings, - ClientResources.create().scheduler()); - } - - /** - * Create a stream admin client with provided {@code withSettings} and {@code scheduler}. - * - * @param settings withSettings to create an admin client. - * @param schedulerResource scheduler to execute. - */ - public StorageAdminClientImpl(StorageClientSettings settings, - Resource<OrderedScheduler> schedulerResource) { - this(() -> new StorageServerClientManagerImpl(settings, schedulerResource)); + resources, + () -> new StorageServerClientManagerImpl(settings, resources.scheduler())); } - @VisibleForTesting - StorageAdminClientImpl(Supplier<StorageServerClientManager> factory) { + StorageAdminClientImpl(StorageClientSettings settings, + ClientResources resources, + Supplier<StorageServerClientManager> factory) { + this.settings = settings; + this.resources = resources; this.clientManager = factory.get(); this.rootRangeClient = this.clientManager.getRootRangeClient(); } @Override + public StorageClient asClient(String namespace) { + return new StorageClientImpl( + namespace, settings, resources, clientManager, false); + } + + @Override public CompletableFuture<NamespaceProperties> createNamespace(String namespace, NamespaceConfiguration colConf) { return rootRangeClient.createNamespace(namespace, colConf); diff --git a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java index 6ae9035..9f0e6df 100644 --- a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java +++ b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -90,7 +91,7 @@ public class StorageClientImplTest extends GrpcClientTestBase { .setStorageType(StorageType.TABLE) .build()) .build(); - when(client.getStreamProperties(anyString())) + when(client.getStreamProperties(anyString(), anyString())) .thenReturn(FutureUtils.value(streamProps)); PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class); @@ -109,13 +110,58 @@ public class StorageClientImplTest extends GrpcClientTestBase { @SuppressWarnings("unchecked") @Test + public void testOpenPTableDiffernetNamespace() throws Exception { + StreamProperties tableProps1 = StreamProperties.newBuilder(STREAM_PROPERTIES) + .setStreamName("table1") + .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setStorageType(StorageType.TABLE) + .build()) + .build(); + when(client.getStreamProperties(eq(NAMESPACE), eq("table1"))) + .thenReturn(FutureUtils.value(tableProps1)); + + StreamProperties tableProps2 = StreamProperties.newBuilder(STREAM_PROPERTIES) + .setStreamName("table2") + .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setStorageType(StorageType.TABLE) + .build()) + .build(); + when(client.getStreamProperties(eq(NAMESPACE), eq("table2"))) + .thenReturn(FutureUtils.value(tableProps2)); + + PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class); + when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1)); + PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class); + when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2)); + + PowerMockito.whenNew(PByteBufTableImpl.class) + .withAnyArguments() + .thenReturn(tableImpl1); + + PTable<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result( + client.openPTable("table1") + ); + assertSame(tableImpl1, returnedTableImpl1); + + PowerMockito.whenNew(PByteBufTableImpl.class) + .withAnyArguments() + .thenReturn(tableImpl2); + + PTable<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result( + client.openPTable("table2") + ); + assertSame(tableImpl2, returnedTableImpl2); + } + + @SuppressWarnings("unchecked") + @Test public void testOpenTable() throws Exception { StreamProperties streamProps = StreamProperties.newBuilder(STREAM_PROPERTIES) .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) .setStorageType(StorageType.TABLE) .build()) .build(); - when(client.getStreamProperties(anyString())) + when(client.getStreamProperties(anyString(), anyString())) .thenReturn(FutureUtils.value(streamProps)); PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class); @@ -136,13 +182,62 @@ public class StorageClientImplTest extends GrpcClientTestBase { @SuppressWarnings("unchecked") @Test + public void testOpenTableWithDifferentNamespace() throws Exception { + StreamProperties tableProps1 = StreamProperties.newBuilder(STREAM_PROPERTIES) + .setStreamName("table1") + .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setStorageType(StorageType.TABLE) + .build()) + .build(); + when(client.getStreamProperties(eq(NAMESPACE), eq("table1"))) + .thenReturn(FutureUtils.value(tableProps1)); + + StreamProperties tableProps2 = StreamProperties.newBuilder(STREAM_PROPERTIES) + .setStreamName("table2") + .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) + .setStorageType(StorageType.TABLE) + .build()) + .build(); + when(client.getStreamProperties(eq(NAMESPACE), eq("table2"))) + .thenReturn(FutureUtils.value(tableProps2)); + + PByteBufTableImpl tableImpl1 = mock(PByteBufTableImpl.class); + when(tableImpl1.initialize()).thenReturn(FutureUtils.value(tableImpl1)); + PByteBufTableImpl tableImpl2 = mock(PByteBufTableImpl.class); + when(tableImpl2.initialize()).thenReturn(FutureUtils.value(tableImpl2)); + + PowerMockito.whenNew(PByteBufTableImpl.class) + .withAnyArguments() + .thenReturn(tableImpl1); + + Table<ByteBuf, ByteBuf> returnedTableImpl1 = FutureUtils.result( + client.openTable("table1") + ); + assertTrue(returnedTableImpl1 instanceof ByteBufTableImpl); + ByteBufTableImpl bytesTableImpl1 = (ByteBufTableImpl) returnedTableImpl1; + assertSame(tableImpl1, Whitebox.getInternalState(bytesTableImpl1, "underlying")); + + PowerMockito.whenNew(PByteBufTableImpl.class) + .withAnyArguments() + .thenReturn(tableImpl2); + + Table<ByteBuf, ByteBuf> returnedTableImpl2 = FutureUtils.result( + client.openTable("table2") + ); + assertTrue(returnedTableImpl2 instanceof ByteBufTableImpl); + ByteBufTableImpl bytesTableImpl2 = (ByteBufTableImpl) returnedTableImpl2; + assertSame(tableImpl2, Whitebox.getInternalState(bytesTableImpl2, "underlying")); + } + + @SuppressWarnings("unchecked") + @Test public void testOpenPTableIllegalOp() throws Exception { StreamProperties streamProps = StreamProperties.newBuilder(STREAM_PROPERTIES) .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) .setStorageType(StorageType.STREAM) .build()) .build(); - when(client.getStreamProperties(anyString())) + when(client.getStreamProperties(anyString(), anyString())) .thenReturn(FutureUtils.value(streamProps)); PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class); diff --git a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java index 0787abc..d9a791a 100644 --- a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java +++ b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/admin/TestStorageAdminClientImpl.java @@ -25,8 +25,10 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient; import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; +import org.apache.bookkeeper.clients.utils.ClientResources; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.NamespaceProperties; @@ -66,7 +68,12 @@ public class TestStorageAdminClientImpl { @Before public void setUp() { when(mockManager.getRootRangeClient()).thenReturn(mockRootRangeClient); - this.adminClient = new StorageAdminClientImpl(() -> mockManager); + this.adminClient = new StorageAdminClientImpl( + StorageClientSettings.newBuilder() + .serviceUri("bk://localhost:4181") + .build(), + ClientResources.create(), + () -> mockManager); } @Test