This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new bb411ab9 [FLINK-29252] Support create table-store table with
'connector'='table-store'
bb411ab9 is described below
commit bb411ab9d5ffdf11cc071353e2fbc1e184327085
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 26 10:32:38 2022 +0800
[FLINK-29252] Support create table-store table with
'connector'='table-store'
This closes #327
---
docs/content/docs/development/create-table.md | 67 +++++++++++-
.../shortcodes/generated/core_configuration.html | 6 ++
.../connector/TableStoreConnectorFactory.java | 25 +++++
.../table/store/connector/MappingTableITCase.java | 118 +++++++++++++++++++++
.../org/apache/flink/table/store/CoreOptions.java | 7 ++
5 files changed, 218 insertions(+), 5 deletions(-)
diff --git a/docs/content/docs/development/create-table.md
b/docs/content/docs/development/create-table.md
index 66a2de62..93dde939 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -26,9 +26,17 @@ under the License.
# Create Table
-## Catalog
+## Managed Table in Table Store Catalog
-Table Store uses its own catalog to manage all the databases and tables. Users
need to configure the type `table-store` and a root directory `warehouse` to
use it.
+If you need to manage tables uniformly according to the catalog and database.
+You can consider using the Table Store Catalog to create managed tables. In
+this case, creating tables will actually create file structures, and deleting
+tables will actually delete table data.
+
+### Catalog
+
+Table Store uses its own catalog to manage all the databases and tables. Users
+need to configure the type `table-store` and a root directory `warehouse` to
use it.
```sql
CREATE CATALOG my_catalog WITH (
@@ -46,7 +54,58 @@ Table Store catalog supports SQL DDL commands:
- `SHOW DATABASES`
- `SHOW TABLES`
-## Syntax
+### Create Managed Table
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ PRIMARY KEY (dt, user_id) NOT ENFORCED
+) PARTITIONED BY (dt);
+```
+
+This will create a directory under
`${warehouse}/${database_name}.db/${table_name}`.
+
+## Mapping Table in Generic Catalog
+
+If you do not want to create a Table Store Catalog, you only want to read and
write
+a table separately. You can use the mapping table, which is a standard Flink
connector
+table.
+
+The SQL `CREATE TABLE T (..) WITH ('connector'='table-store', 'path'='...')`
will
+create a Table Store table in current catalog, the catalog should support
generic
+Flink connector tables, the available catalogs are `GenericInMemoryCatalog`
(by default)
+and `HiveCatalog`. The generic catalog only manages the mapping relationship
between
+tables and underlying file structure in `path`, but does not really create and
delete
+tables.
+
+- By default, the mapping table needs to be mapped to an actual underlying
file structure
+ in FileSystem `path`. If the file structure in `path` does not exist, an
exception will be thrown.
+- If you want to create the file structure automatically when reading or
writing a table,
+ you can configure `auto-create` to `true`: `CREATE TABLE T (..) WITH
('connector'='table-store', 'path'='...', 'auto-create'='true')`.
+
+For example:
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ PRIMARY KEY (dt, user_id) NOT ENFORCED
+) PARTITIONED BY (dt) WITH (
+ 'connector'='table-store',
+ 'path'='hdfs://nn:8020/my_table_path',
+ 'auto-create'='true'
+);
+
+-- This will create a directory structure under path.
+INSERT INTO MyTable SELECT ...;
+```
+
+## Table Syntax
```sql
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
@@ -82,8 +141,6 @@ __Note:__
field, you can achieve the unique effect.
{{< /hint >}}
-This will create a directory under
`${warehouse}/${database_name}.db/${table_name}`.
-
## Table Options
Important options include the following:
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2867cc98..d6200a94 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>auto-create</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to create underlying storage when reading and writing
the table.</td>
+ </tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">1</td>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
index 69e738e7..dd07c9e6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreConnectorFactory.java
@@ -18,15 +18,22 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.file.catalog.CatalogLock;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
import javax.annotation.Nullable;
+import static org.apache.flink.table.store.CoreOptions.AUTO_CREATE;
import static
org.apache.flink.table.store.connector.FlinkCatalogFactory.IDENTIFIER;
/** A table store {@link DynamicTableFactory} to create source and sink. */
@@ -59,6 +66,7 @@ public class TableStoreConnectorFactory extends
AbstractTableStoreFactory {
context.getClassLoader(),
context.isTemporary());
}
+ createTableIfNeeded(context);
return super.createDynamicTableSource(context);
}
@@ -74,11 +82,28 @@ public class TableStoreConnectorFactory extends
AbstractTableStoreFactory {
context.getClassLoader(),
context.isTemporary());
}
+ createTableIfNeeded(context);
TableStoreSink sink = (TableStoreSink)
super.createDynamicTableSink(context);
sink.setLockFactory(lockFactory);
return sink;
}
+ private void createTableIfNeeded(Context context) {
+ ResolvedCatalogTable table = context.getCatalogTable();
+ Configuration options = Configuration.fromMap(table.getOptions());
+ if (options.get(AUTO_CREATE)) {
+ Path tablePath = CoreOptions.path(table.getOptions());
+ SchemaManager schemaManager = new SchemaManager(tablePath);
+ if (!schemaManager.latest().isPresent()) {
+ try {
+
schemaManager.commitNewVersion(UpdateSchema.fromCatalogTable(table));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
private boolean isFlinkTable(Context context) {
String identifier =
context.getCatalogTable().getOptions().get(FactoryUtil.CONNECTOR.key());
return identifier != null && !IDENTIFIER.equals(identifier);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
new file mode 100644
index 00000000..7f909390
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/MappingTableITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** ITCase for mapping table api. */
+public class MappingTableITCase extends AbstractTestBase {
+
+ private TableEnvironment tEnv;
+ private String path;
+
+ @Before
+ public void before() throws IOException {
+ tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ }
+
+ @Test
+ public void testCreateEmptyMappingTable() {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'connector'='table-store', 'path'='%s')",
+ path));
+ assertThrows(
+ ValidationException.class,
+ () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2), (3,
4)").await(),
+ "Schema file not found in location");
+ }
+
+ @Test
+ public void testCreateMappingTable() throws ExecutionException,
InterruptedException {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'connector'='table-store', 'path'='%s',
'auto-create'='true')",
+ path));
+ tEnv.executeSql("INSERT INTO T VALUES (1, 2), (3, 4)").await();
+
+ tEnv.executeSql("DROP TABLE T");
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'connector'='table-store', 'path'='%s')",
+ path));
+
+ List<Row> result = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
T").collect());
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2), Row.of(3,
4));
+ }
+
+ @Test
+ public void testCreateTemporaryTableRepeat() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'connector'='table-store', 'path'='%s',
'auto-create'='true')",
+ path));
+ tEnv.executeSql("SELECT * FROM T").collect().close();
+ tEnv.executeSql("DROP TABLE T");
+ }
+ }
+
+ @Test
+ public void testCreateTemporaryTableConflict() throws Exception {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT) WITH ("
+ + "'connector'='table-store', 'path'='%s',
'auto-create'='true')",
+ path));
+ tEnv.executeSql("SELECT * FROM T").collect().close();
+ tEnv.executeSql("DROP TABLE T");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (i INT, j INT, k INT) WITH ("
+ + "'connector'='table-store', 'path'='%s',
'auto-create'='true')",
+ path));
+
+ assertThrows(
+ ValidationException.class,
+ () -> tEnv.executeSql("SELECT * FROM T").collect().close(),
+ "Flink schema and store schema are not the same");
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 10c4308f..45360b54 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -362,6 +362,13 @@ public class CoreOptions implements Serializable {
.defaultValue("debezium-json")
.withDescription("Specify the message format of log
system.");
+ public static final ConfigOption<Boolean> AUTO_CREATE =
+ ConfigOptions.key("auto-create")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to create underlying storage when reading
and writing the table.");
+
private final Configuration options;
public CoreOptions(Map<String, String> options) {