This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new bb411ab9 [FLINK-29252] Support create table-store table with 
'connector'='table-store'
bb411ab9 is described below

commit bb411ab9d5ffdf11cc071353e2fbc1e184327085
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 26 10:32:38 2022 +0800

    [FLINK-29252] Support create table-store table with 
'connector'='table-store'
    
    This closes #327
---
 docs/content/docs/development/create-table.md      |  67 +++++++++++-
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../connector/TableStoreConnectorFactory.java      |  25 +++++
 .../table/store/connector/MappingTableITCase.java  | 118 +++++++++++++++++++++
 .../org/apache/flink/table/store/CoreOptions.java  |   7 ++
 5 files changed, 218 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/development/create-table.md 
b/docs/content/docs/development/create-table.md
index 66a2de62..93dde939 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -26,9 +26,17 @@ under the License.
 
 # Create Table
 
-## Catalog
+## Managed Table in Table Store Catalog
 
-Table Store uses its own catalog to manage all the databases and tables. Users 
need to configure the type `table-store` and a root directory `warehouse` to 
use it.
+If you need to manage tables uniformly according to the catalog and database.
+You can consider using the Table Store Catalog to create managed tables. In
+this case, creating tables will actually create file structures, and deleting
+tables will actually delete table data.
+
+### Catalog
+
+Table Store uses its own catalog to manage all the databases and tables. Users
+need to configure the type `table-store` and a root directory `warehouse` to 
use it.
 
 ```sql
 CREATE CATALOG my_catalog WITH (
@@ -46,7 +54,58 @@ Table Store catalog supports SQL DDL commands:
 - `SHOW DATABASES`
 - `SHOW TABLES`
 
-## Syntax
+### Create Managed Table
+
+```sql
+CREATE TABLE MyTable (
+   user_id BIGINT,
+   item_id BIGINT,
+   behavior STRING,
+   dt STRING,
+   PRIMARY KEY (dt, user_id) NOT ENFORCED
+) PARTITIONED BY (dt);
+```
+
+This will create a directory under 
`${warehouse}/${database_name}.db/${table_name}`.
+
+## Mapping Table in Generic Catalog
+
+If you do not want to create a Table Store Catalog, you only want to read and 
write
+a table separately. You can use the mapping table, which is a standard Flink 
connector
+table.
+
+The SQL `CREATE TABLE T (..) WITH ('connector'='table-store', 'path'='...')` 
will
+create a Table Store table in current catalog, the catalog should support 
generic
+Flink connector tables, the available catalogs are `GenericInMemoryCatalog` 
(by default)
+and `HiveCatalog`. The generic catalog only manages the mapping relationship 
between
+tables and underlying file structure in `path`, but does not really create and 
delete
+tables.
+
+- By default, the mapping table needs to be mapped to an actual underlying 
file structure
+  in FileSystem `path`. If the file structure in `path` does not exist, an 
exception will be thrown.
+- If you want to create the file structure automatically when reading or 
writing a table,
+  you can configure `auto-create` to `true`: `CREATE TABLE T (..) WITH 
('connector'='table-store', 'path'='...', 'auto-create'='true')`.
+
+For example:
+
+```sql
+CREATE TABLE MyTable (
+   user_id BIGINT,
+   item_id BIGINT,
+   behavior STRING,
+   dt STRING,
+   PRIMARY KEY (dt, user_id) NOT ENFORCED
+) PARTITIONED BY (dt) WITH (
+  'connector'='table-store',
+  'path'='hdfs://nn:8020/my_table_path',
+  'auto-create'='true'
+);
+
+-- This will create a directory structure under path.
+INSERT INTO MyTable SELECT ...;
+```
+
+## Table Syntax
 
 ```sql
 CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
@@ -82,8 +141,6 @@ __Note:__
   field, you can achieve the unique effect.
   {{< /hint >}}
 
-This will create a directory under 
`${warehouse}/${database_name}.db/${table_name}`.
-
 ## Table Options
 
 Important options include the following:
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2867cc98..d6200a94 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>auto-create</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to create underlying storage when reading and writing 
the table.</td>
+        </tr>
         <tr>
             <td><h5>bucket</h5></td>
             <td style="word-wrap: break-word;">1</td>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 69e738e7..dd07c9e6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -18,15 +18,22 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
 
 import javax.annotation.Nullable;
 
+import static org.apache.flink.table.store.CoreOptions.AUTO_CREATE;
 import static 
org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
 
 /** A table store {@link DynamicTableFactory} to create source and sink. */
@@ -59,6 +66,7 @@ public class TableStoreConnectorFactory extends 
AbstractTableStoreFactory {
                     context.getClassLoader(),
                     context.isTemporary());
         }
+        createTableIfNeeded(context);
         return super.createDynamicTableSource(context);
     }
 
@@ -74,11 +82,28 @@ public class TableStoreConnectorFactory extends 
AbstractTableStoreFactory {
                     context.getClassLoader(),
                     context.isTemporary());
         }
+        createTableIfNeeded(context);
         TableStoreSink sink = (TableStoreSink) 
super.createDynamicTableSink(context);
         sink.setLockFactory(lockFactory);
         return sink;
     }
 
+    private void createTableIfNeeded(Context context) {
+        ResolvedCatalogTable table = context.getCatalogTable();
+        Configuration options = Configuration.fromMap(table.getOptions());
+        if (options.get(AUTO_CREATE)) {
+            Path tablePath = CoreOptions.path(table.getOptions());
+            SchemaManager schemaManager = new SchemaManager(tablePath);
+            if (!schemaManager.latest().isPresent()) {
+                try {
+                    
schemaManager.commitNewVersion(UpdateSchema.fromCatalogTable(table));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
     private boolean isFlinkTable(Context context) {
         String identifier = 
context.getCatalogTable().getOptions().get(FactoryUtil.CONNECTOR.key());
         return identifier != null && !IDENTIFIER.equals(identifier);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
new file mode 100644
index 00000000..7f909390
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** ITCase for mapping table api. */
+public class MappingTableITCase extends AbstractTestBase {
+
+    private TableEnvironment tEnv;
+    private String path;
+
+    @Before
+    public void before() throws IOException {
+        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+    }
+
+    @Test
+    public void testCreateEmptyMappingTable() {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'connector'='table-store', 'path'='%s')",
+                        path));
+        assertThrows(
+                ValidationException.class,
+                () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2), (3, 
4)").await(),
+                "Schema file not found in location");
+    }
+
+    @Test
+    public void testCreateMappingTable() throws ExecutionException, 
InterruptedException {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'connector'='table-store', 'path'='%s', 
'auto-create'='true')",
+                        path));
+        tEnv.executeSql("INSERT INTO T VALUES (1, 2), (3, 4)").await();
+
+        tEnv.executeSql("DROP TABLE T");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'connector'='table-store', 'path'='%s')",
+                        path));
+
+        List<Row> result = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
T").collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3, 
4));
+    }
+
+    @Test
+    public void testCreateTemporaryTableRepeat() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T (i INT, j INT) WITH ("
+                                    + "'connector'='table-store', 'path'='%s', 
'auto-create'='true')",
+                            path));
+            tEnv.executeSql("SELECT * FROM T").collect().close();
+            tEnv.executeSql("DROP TABLE T");
+        }
+    }
+
+    @Test
+    public void testCreateTemporaryTableConflict() throws Exception {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'connector'='table-store', 'path'='%s', 
'auto-create'='true')",
+                        path));
+        tEnv.executeSql("SELECT * FROM T").collect().close();
+        tEnv.executeSql("DROP TABLE T");
+
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT, k INT) WITH ("
+                                + "'connector'='table-store', 'path'='%s', 
'auto-create'='true')",
+                        path));
+
+        assertThrows(
+                ValidationException.class,
+                () -> tEnv.executeSql("SELECT * FROM T").collect().close(),
+                "Flink schema and store schema are not the same");
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 10c4308f..45360b54 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -362,6 +362,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue("debezium-json")
                     .withDescription("Specify the message format of log 
system.");
 
+    public static final ConfigOption<Boolean> AUTO_CREATE =
+            ConfigOptions.key("auto-create")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to create underlying storage when reading 
and writing the table.");
+
     private final Configuration options;
 
     public CoreOptions(Map<String, String> options) {

Reply via email to