This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d3e9d56  [TABLE SERVICE] Set storage type on creating tables
d3e9d56 is described below

commit d3e9d56c18df337fa22da47aa8458bde5b74d141
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Jul 24 13:11:39 2018 +0200

    [TABLE SERVICE] Set storage type on creating tables
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    We are using same metadata objects for both streams and tables. and we are
    using a storage model to distinguish them. We need to make sure we setting 
the storage
    model when creating tables.
    
    *Changes*
    
    Set storage type on creating tables.
    
    Author: Sijie Guo <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>
    
    This closes #1551 from sijie/set_storage_type
---
 stream/clients/java/all/pom.xml                    |   7 +
 .../bookkeeper/clients/StorageClientImpl.java      |  11 +-
 .../bookkeeper/clients/StorageClientImplTest.java  | 162 +++++++++++++++++++++
 .../integration/stream/TableClientSimpleTest.java  |   2 +
 .../tests/integration/stream/TableClientTest.java  |   2 +
 .../cli/commands/table/CreateTableCommand.java     |   6 +-
 6 files changed, 187 insertions(+), 3 deletions(-)

diff --git a/stream/clients/java/all/pom.xml b/stream/clients/java/all/pom.xml
index 2519b33..395dbcb 100644
--- a/stream/clients/java/all/pom.xml
+++ b/stream/clients/java/all/pom.xml
@@ -32,6 +32,13 @@
       <artifactId>stream-storage-java-kv-client</artifactId>
       <version>${project.parent.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 37c8223..fca1e5a 100644
--- 
a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ 
b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.exceptions.ApiException;
 import org.apache.bookkeeper.api.kv.PTable;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
@@ -36,6 +37,7 @@ import 
org.apache.bookkeeper.common.util.AbstractAutoAsyncCloseable;
 import org.apache.bookkeeper.common.util.ExceptionUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.SharedResourceManager;
+import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 
 /**
@@ -65,7 +67,7 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable 
implements StorageCli
 
     }
 
-    private CompletableFuture<StreamProperties> getStreamProperties(String 
streamName) {
+    CompletableFuture<StreamProperties> getStreamProperties(String streamName) 
{
         return 
this.serverManager.getRootRangeClient().getStream(namespaceName, streamName);
     }
 
@@ -92,7 +94,12 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable 
implements StorageCli
         FutureUtils.proxyTo(
             getStreamProperties(streamName).thenComposeAsync(props -> {
                 if (log.isInfoEnabled()) {
-                    log.info("Retrieved stream properties for stream {} : {}", 
streamName, props);
+                    log.info("Retrieved table properties for table {} : {}", 
streamName, props);
+                }
+                if (StorageType.TABLE != 
props.getStreamConf().getStorageType()) {
+                    return FutureUtils.exception(new ApiException(
+                        "Can't open a non-table storage entity : " + 
props.getStreamConf().getStorageType())
+                    );
                 }
                 return new PByteBufTableImpl(
                     streamName,
diff --git 
a/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
new file mode 100644
index 0000000..6ae9035
--- /dev/null
+++ 
b/stream/clients/java/all/src/test/java/org/apache/bookkeeper/clients/StorageClientImplTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients;
+
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import io.netty.buffer.ByteBuf;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.exceptions.ApiException;
+import org.apache.bookkeeper.api.kv.PTable;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.kv.ByteBufTableImpl;
+import org.apache.bookkeeper.clients.impl.kv.PByteBufTableImpl;
+import org.apache.bookkeeper.clients.utils.ClientResources;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Unit test {@link StorageClientImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    StorageClientImpl.class,
+})
+@Slf4j
+public class StorageClientImplTest extends GrpcClientTestBase {
+
+    private static final String NAMESPACE = "test-namespace";
+    private static final String STREAM_NAME = "test-stream-name";
+    private static final StreamProperties STREAM_PROPERTIES = 
StreamProperties.newBuilder()
+        .setStreamId(1234L)
+        .setStreamConf(DEFAULT_STREAM_CONF)
+        .setStreamName(STREAM_NAME)
+        .setStorageContainerId(16)
+        .build();
+
+    private StorageClientImpl client;
+
+    @Override
+    protected void doSetup() {
+        this.client = spy(new StorageClientImpl(
+            NAMESPACE,
+            settings,
+            ClientResources.create()
+        ));
+    }
+
+    @Override
+    protected void doTeardown() {
+        this.client.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOpenPTable() throws Exception {
+        StreamProperties streamProps = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(anyString()))
+            .thenReturn(FutureUtils.value(streamProps));
+
+        PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+        when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl);
+
+        PTable<ByteBuf, ByteBuf> returnedTableImpl = FutureUtils.result(
+            client.openPTable(STREAM_NAME)
+        );
+
+        assertSame(tableImpl, returnedTableImpl);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOpenTable() throws Exception {
+        StreamProperties streamProps = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.TABLE)
+                .build())
+            .build();
+        when(client.getStreamProperties(anyString()))
+            .thenReturn(FutureUtils.value(streamProps));
+
+        PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+        when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl);
+
+        Table<ByteBuf, ByteBuf> returnedTableImpl = FutureUtils.result(
+            client.openTable(STREAM_NAME)
+        );
+        assertTrue(returnedTableImpl instanceof ByteBufTableImpl);
+        ByteBufTableImpl bytesTableImpl = (ByteBufTableImpl) returnedTableImpl;
+
+        assertSame(tableImpl, Whitebox.getInternalState(bytesTableImpl, 
"underlying"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOpenPTableIllegalOp() throws Exception {
+        StreamProperties streamProps = 
StreamProperties.newBuilder(STREAM_PROPERTIES)
+            .setStreamConf(StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                .setStorageType(StorageType.STREAM)
+                .build())
+            .build();
+        when(client.getStreamProperties(anyString()))
+            .thenReturn(FutureUtils.value(streamProps));
+
+        PByteBufTableImpl tableImpl = mock(PByteBufTableImpl.class);
+        when(tableImpl.initialize()).thenReturn(FutureUtils.value(tableImpl));
+
+        PowerMockito.whenNew(PByteBufTableImpl.class)
+            .withAnyArguments()
+            .thenReturn(tableImpl);
+
+        try {
+            FutureUtils.result(client.openPTable(STREAM_NAME));
+            fail("Should fail #openTable on opening a non-table storage 
entity");
+        } catch (ApiException sae) {
+            // expected exception
+        }
+    }
+}
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
index e88dfc3..c07b3eb 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientSimpleTest.java
@@ -43,6 +43,7 @@ import 
org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.junit.After;
@@ -102,6 +103,7 @@ public class TableClientSimpleTest extends 
StreamClusterTestBase {
         // Create a stream
         String streamName = testName.getMethodName() + "_stream";
         StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+            .setStorageType(StorageType.TABLE)
             .build();
         StreamProperties streamProps = result(
             adminClient.createStream(namespace, streamName, streamConf));
diff --git 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
index 6a287da..467e455 100644
--- 
a/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
+++ 
b/tests/integration/cluster/src/test/java/org/apache/bookkeeper/tests/integration/stream/TableClientTest.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
 import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StorageType;
 import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.junit.After;
@@ -113,6 +114,7 @@ public class TableClientTest extends StreamClusterTestBase {
         // Create a stream
         String streamName = testName.getMethodName() + "_stream";
         StreamConfiguration streamConf = 
StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+            .setStorageType(StorageType.TABLE)
             .build();
         StreamProperties streamProps = FutureUtils.result(
             adminClient.createStream(namespace, streamName, streamConf));
diff --git 
a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
 
b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
index 784c8c2..5b50d1f 100644
--- 
a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
+++ 
b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/CreateTableCommand.java
@@ -24,6 +24,8 @@ import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.stream.cli.commands.AdminCommand;
 import 
org.apache.bookkeeper.stream.cli.commands.table.CreateTableCommand.Flags;
+import org.apache.bookkeeper.stream.proto.StorageType;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
 import org.apache.bookkeeper.stream.proto.StreamProperties;
 import org.apache.bookkeeper.tools.common.BKFlags;
 import org.apache.bookkeeper.tools.framework.CliFlags;
@@ -66,7 +68,9 @@ public class CreateTableCommand extends AdminCommand<Flags> {
             admin.createStream(
                 globalFlags.namespace,
                 streamName,
-                DEFAULT_STREAM_CONF));
+                StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
+                    .setStorageType(StorageType.TABLE)
+                    .build()));
         spec.console().println("Successfully created stream '" + streamName + 
"':");
         spec.console().println(nsProps);
     }

Reply via email to