This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new d3e9d56 [TABLE SERVICE] Set storage type on creating tables
d3e9d56 is described below
commit d3e9d56c18df337fa22da47aa8458bde5b74d141
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 24 13:11:39 2018 +0200
[TABLE SERVICE] Set storage type on creating tables
Descriptions of the changes in this PR:
*Motivation*
We are using same metadata objects for both streams and tables. and we are
using a storage model to distinguish them. We need to make sure we setting
the storage
model when creating tables.
*Changes*
Set storage type on creating tables.
Author: Sijie Guo <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>
This closes #1551 from sijie/set_storage_type
---
stream/clients/java/all/pom.xml | 7 +
.../bookkeeper/clients/StorageClientImpl.java | 11 +-
.../bookkeeper/clients/StorageClientImplTest.java | 162 +++++++++++++++++++++
.../integration/stream/TableClientSimpleTest.java | 2 +
.../tests/integration/stream/TableClientTest.java | 2 +
.../cli/commands/table/CreateTableCommand.java | 6 +-
6 files changed, 187 insertions(+), 3 deletions(-)
diff --git a/stream/clients/java/all/pom.xml b/stream/clients/java/all/pom.xml
index 2519b33..395dbcb 100644
--- a/stream/clients/java/all/pom.xml
+++ b/stream/clients/java/all/pom.xml
@@ -32,6 +32,13 @@
<artifactId>stream-storage-java-kv-client</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>stream-storage-java-client-base</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 37c8223..fca1e5a 100644
---
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.exceptions.ApiException;
import org.apache.bookkeeper.api.kv.PTable;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -36,6 +37,7 @@ import
org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
import org.apache.bookkeeper.common.util.ExceptionUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SharedResourceManager;
+import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamProperties;
/**
@@ -65,7 +67,7 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable
implements StorageCli
}
- private CompletableFuture<StreamProperties> getStreamProperties(String
streamName) {
+ CompletableFuture<StreamProperties> getStreamProperties(String streamName)
{
return
this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
}
@@ -92,7 +94,12 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable
implements StorageCli
FutureUtils.proxyTo(
getStreamProperties(streamName).thenComposeAsync(props -> {
if (log.isInfoEnabled()) {
- log.info("Retrieved stream properties for stream {} : {}",
streamName, props);
+ log.info("Retrieved table properties for table {} : {}",
streamName, props);
+ }
+ if (StorageType.TABLE !=
props.getStreamConf().getStorageType()) {
+ return FutureUtils.exception(new ApiException(
+ "Can't open a non-table storage entity : " +
props.getStreamConf().getStorageType())
+ );
}
return new PByteBufTableImpl(
streamName,
diff --git
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
new file mode 100644
index 0000000..6ae9035
--- /dev/null
+++
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients;
+
+import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.exceptions.ApiException;
+import org.apache.bookkeeper.api.kv.PTable;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.kv.ByteBufTableImpl;
+import org.apache.bookkeeper.clients.impl.kv.PByteBufTableImpl;
+import org.apache.bookkeeper.clients.utils.ClientResources;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Unit test {@link StorageClientImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ StorageClientImpl.class,
+})
+@Slf4j
+public class StorageClientImplTest extends GrpcClientTestBase {
+
+ private static final String NAMESPACE = "test-namespace";
+ private static final String STREAM_NAME = "test-stream-name";
+ private static final StreamProperties STREAM_PROPERTIES =
StreamProperties.newBuilder()
+ .setStreamId(1234L)
+ .setStreamConf(DEFAULT_STREAM_CONF)
+ .setStreamName(STREAM_NAME)
+ .setStorageContainerId(16)
+ .build();
+
+ private StorageClientImpl client;
+
+ @Override
+ protected void doSetup() {
+ this.client = spy(new StorageClientImpl(
+ NAMESPACE,
+ settings,
+ ClientResources.create()
+ ));
+ }
+
+ @Override
+ protected void doTeardown() {
+ this.client.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOpenPTable() throws Exception {
+ StreamProperties streamProps =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(anyString()))
+ .thenReturn(FutureUtils.value(streamProps));
+
+ PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+ when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl);
+
+ PTable<ByteBuf, ByteBuf> returnedTableImpl = FutureUtils.result(
+ client.openPTable(STREAM_NAME)
+ );
+
+ assertSame(tableImpl, returnedTableImpl);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOpenTable() throws Exception {
+ StreamProperties streamProps =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build())
+ .build();
+ when(client.getStreamProperties(anyString()))
+ .thenReturn(FutureUtils.value(streamProps));
+
+ PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+ when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl);
+
+ Table<ByteBuf, ByteBuf> returnedTableImpl = FutureUtils.result(
+ client.openTable(STREAM_NAME)
+ );
+ assertTrue(returnedTableImpl instanceof ByteBufTableImpl);
+ ByteBufTableImpl bytesTableImpl = (ByteBufTableImpl) returnedTableImpl;
+
+ assertSame(tableImpl, Whitebox.getInternalState(bytesTableImpl,
"underlying"));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOpenPTableIllegalOp() throws Exception {
+ StreamProperties streamProps =
StreamProperties.newBuilder(STREAM_PROPERTIES)
+ .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.STREAM)
+ .build())
+ .build();
+ when(client.getStreamProperties(anyString()))
+ .thenReturn(FutureUtils.value(streamProps));
+
+ PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+ when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+ PowerMockito.whenNew(PByteBufTableImpl.class)
+ .withAnyArguments()
+ .thenReturn(tableImpl);
+
+ try {
+ FutureUtils.result(client.openPTable(STREAM_NAME));
+ fail("Should fail #openTable on opening a non-table storage
entity");
+ } catch (ApiException sae) {
+ // expected exception
+ }
+ }
+}
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index e88dfc3..c07b3eb 100644
---
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -43,6 +43,7 @@ import
org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.junit.After;
@@ -102,6 +103,7 @@ public class TableClientSimpleTest extends
StreamClusterTestBase {
// Create a stream
String streamName = testName.getMethodName() + "_stream";
StreamConfiguration streamConf =
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
.build();
StreamProperties streamProps = result(
adminClient.createStream(namespace, streamName, streamConf));
diff --git
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 6a287da..467e455 100644
---
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
+++
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.junit.After;
@@ -113,6 +114,7 @@ public class TableClientTest extends StreamClusterTestBase {
// Create a stream
String streamName = testName.getMethodName() + "_stream";
StreamConfiguration streamConf =
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
.build();
StreamProperties streamProps = FutureUtils.result(
adminClient.createStream(namespace, streamName, streamConf));
diff --git
a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
index 784c8c2..5b50d1f 100644
---
a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
+++
b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
@@ -24,6 +24,8 @@ import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
import
org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand.Flags;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.tools.common.BKFlags;
import org.apache.bookkeeper.tools.framework.CliFlags;
@@ -66,7 +68,9 @@ public class CreateTableCommand extends AdminCommand<Flags> {
admin.createStream(
globalFlags.namespace,
streamName,
- DEFAULT_STREAM_CONF));
+ StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+ .setStorageType(StorageType.TABLE)
+ .build()));
spec.console().println("Successfully created stream '" + streamName +
"':");
spec.console().println(nsProps);
}