This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 716eb827b4f8487d9feff4c9dc3bebb10d443de3 Author: Jingsong Lee <[email protected]> AuthorDate: Fri Apr 1 19:39:45 2022 +0800 [FLINK-26898] Introduce creating table document for table store This closes #66 --- docs/config.toml | 1 + docs/content/docs/development/_index.md | 26 +++++ docs/content/docs/development/create-table.md | 153 ++++++++++++++++++++++++++ docs/content/docs/development/distribution.md | 107 ++++++++++++++++++ docs/content/docs/development/overview.md | 139 +++++++++++++++++++++++ docs/static/img/architecture.svg | 1 + pom.xml | 1 + 7 files changed, 428 insertions(+) diff --git a/docs/config.toml b/docs/config.toml index 1119806..274144d 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -61,6 +61,7 @@ pygmentsUseClasses = true ] PreviousDocs = [ + ["0.1", "https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1"], ] [markup] diff --git a/docs/content/docs/development/_index.md b/docs/content/docs/development/_index.md new file mode 100644 index 0000000..3229296 --- /dev/null +++ b/docs/content/docs/development/_index.md @@ -0,0 +1,26 @@ +--- +title: Development +icon: <i class="fa fa-code title maindish" aria-hidden="true"></i> +bold: true +sectionBreak: true +bookCollapseSection: true +weight: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md new file mode 100644 index 0000000..01185f6 --- /dev/null +++ b/docs/content/docs/development/create-table.md @@ -0,0 +1,153 @@ +--- +title: "Create Table" +weight: 2 +type: docs +aliases: +- /development/create-table.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# CREATE statement + +```sql +CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name + ( + { <physical_column_definition> | <computed_column_definition> }[ , ...n] + [ <watermark_definition> ] + [ <table_constraint> ][ , ...n] + ) + [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + WITH (key1=val1, key2=val2, ...) + +<physical_column_definition>: + column_name column_type [ <column_constraint> ] [COMMENT column_comment] + +<column_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED + +<computed_column_definition>: + column_name AS computed_column_expression [COMMENT column_comment] + +<watermark_definition>: + WATERMARK FOR rowtime_column_name AS watermark_strategy_expression +``` + +{{< hint info >}} +__Note:__ To ensure the uniqueness of the primary key, the +primary key must contain the partition field. +{{< /hint >}} + +{{< hint info >}} +__Note:__ Metadata column is not supported yet. +{{< /hint >}} + +Table options that do not contain the 'connector' key and value +represent a managed table. Creating a table will create the +corresponding physical storage. + +When the corresponding physical storage already exists, +such as a file directory or kafka topic: +- If you want to reuse it, use `CREATE TABLE IF NOT EXISTS` +- If you don't want to reuse it, `DROP TABLE IF EXISTS` + or delete it yourself. + +It is recommended that you use a persistent catalog, such as +`HiveCatalog`, otherwise make sure you create the table with +the same options each time. + +## Session Options + +To create a managed table, you need to set the required +session options in advance. Session options are only valid +when creating a table, not interfering with reading or +writing the table. + +You can set session options in the following two ways: +- Edit `flink-conf.yaml`. +- Via `TableEnvironment.getConfig().set`. + +The difference between session options and table options +is that the session option needs to be prefixed with +`table-store`. Take `bucket` option for example: +- set as session level: `SET 'table-store.bucket' = '4';` +- set as per table level: `CREATE TABLE ... WITH ('bucket' = '4')` + +Important options include the following: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-center" style="width: 5%">Required</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>file.path</h5></td> + <td>Yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The root file path of the table store in the filesystem.</td> + </tr> + <tr> + <td><h5>bucket</h5></td> + <td>Yes</td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The bucket number for table store.</td> + </tr> + <tr> + <td><h5>log.system</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The log system used to keep changes of the table, supports 'kafka'.</td> + </tr> + <tr> + <td><h5>log.kafka.bootstrap.servers</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Required Kafka server connection string for log store.</td> + </tr> + <tr> + <td><h5>log.retention</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The duration to keep a log file before deleting it. The default value is from the log system cluster.</td> + </tr> + </tbody> +</table> + +## Physical storage + +Creating a table will create the corresponding physical storage: +- The table's FileStore directory will be created under: + `${file.path}/${catalog_name}.catalog/${database_name}.db/${table_name}` +- If `log.system` is configured as Kafka, a Topic named + "${catalog_name}.${database_name}.${table_name}" will be created + automatically when the table is created. diff --git a/docs/content/docs/development/distribution.md b/docs/content/docs/development/distribution.md new file mode 100644 index 0000000..c09901f --- /dev/null +++ b/docs/content/docs/development/distribution.md @@ -0,0 +1,107 @@ +--- +title: "Distribution" +weight: 3 +type: docs +aliases: +- /development/distribution.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Distribution + +The data distribution of Table Store consists of three concepts: +Partition, Bucket, and Primary Key. + +```sql +CREATE TABLE MyTable ( + user_id BIGINT, + item_id BIGINT, + behavior STRING, + dt STRING, + PRIMARY KEY (dt, user_id) NOT ENFORCED +) PARTITION BY (dt) WITH ( + 'bucket' = '4' +); +``` + +For example, the `MyTable` table above has its data distribution +in the following order: +- Partition: isolating different data based on partition fields. +- Bucket: Within a single partition, distributed into 4 different + buckets based on the hash value of the primary key. +- Primary key: Within a single bucket, sorted by primary key to + build LSM structure. + +## Partition + +Table Store adopts the same partitioning concept as Apache Hive to +separate data, and thus various operations can be managed by partition +as a management unit. + +Partitioned filtering is the most effective way to improve performance, +your query statements should contain partition filtering conditions +as much as possible. + +## Bucket + +The record is hashed into different buckets according to the +primary key or the whole row (without primary key). + +The number of buckets is very important as it determines the +worst-case maximum processing parallelism. But it should not be +too big, otherwise, the system will create a lot of small files. + +In general, the desired file size is 128 MB, the recommended data +to be kept on disk in each sub-bucket is about 1 GB. + +## Primary Key + +The primary key is unique and is indexed. + +Flink Table Store imposes an ordering of data, which means the system +will sort the primary key within each bucket. All fields will be used +to sort if no primary key is defined. Using this feature, you can +achieve high performance by adding filter conditions on the primary key. + +The primary key's choice is critical, especially when setting the composite +primary key. A rule of thumb is to put the most frequently queried field in +the front. For example: + +```sql +CREATE TABLE MyTable ( + catalog_id BIGINT, + user_id BIGINT, + item_id BIGINT, + behavior STRING, + dt STRING, + ...... +); +``` + +For this table, assuming that the composite primary keys are +the `catalog_id` and `user_id` fields, there are two ways to +set the primary key: +1. PRIMARY KEY (user_id, catalog_id) +2. PRIMARY KEY (catalog_id, user_id) + +The two methods do not behave in the same way when querying. +Use approach 1 if you have a large number of filtered queries +with only `user_id`, and use approach 2 if you have a large +number of filtered queries with only `catalog_id`. diff --git a/docs/content/docs/development/overview.md b/docs/content/docs/development/overview.md new file mode 100644 index 0000000..ffbef5e --- /dev/null +++ b/docs/content/docs/development/overview.md @@ -0,0 +1,139 @@ +--- +title: "Overview" +weight: 1 +type: docs +aliases: +- /development/overview.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Overview + +Flink Table Store is a unified streaming and batch store for building dynamic +tables on Apache Flink. Flink Table Store serves as the storage engine behind +Flink SQL Managed Table. + +## Managed Table + +The typical usage of Flink SQL DDL is to specify the 'connector' and fill in +the complex connection information in 'with'. The DDL just establishes an implicit +relationship with the external system. We call such Table as external table. + +```sql +-- an external table ddl +CREATE TABLE KafkaTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `behavior` STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'user_behavior', + 'properties.bootstrap.servers' = 'localhost:9092', + 'properties.group.id' = 'testGroup', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'csv' +); +``` + +The managed table is different, the connection information is already +filled in the session environment, the user only needs to focus on the +business logic when writing the table creation DDL. The DDL is no longer +just an implicit relationship; creating a table will create the corresponding +physical storage, and dropping a table will delete the corresponding +physical storage. + +```sql +-- a managed table ddl +CREATE TABLE MyTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `behavior` STRING +); +``` + +## Unify Streaming and Batch + +There are three types of connectors in Flink SQL. +- Message queue, such as Apache Kafka, it is used in both source and + intermediate stages in this pipeline, to guarantee the latency stay + within seconds. +- OLAP system, such as Clickhouse, it receives processed data in + streaming fashion and serving user’s ad-hoc queries. +- Batch storage, such as Apache Hive, it supports various operations + of the traditional batch processing, including `INSERT OVERWRITE`. + +Flink Table Store provides table abstraction. It is used in a way that +does not differ from the traditional database: +- In Flink `batch` execution mode, it acts like a Hive table and + supports various operations of Batch SQL. Query it to see the + latest snapshot. +- In Flink `streaming` execution mode, it acts like a message queue. + Query it acts like querying a stream changelog from a message queue + where historical data never expires. + +Different `log.scan` mode will result in different consuming behavior under streaming mode. +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Scan Mode</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>FULL</h5></td> + <td>Yes</td> + <td>FULL scan mode performs a hybrid reading with a snapshot scan and the continuous incremental scan.</td> + </tr> + <tr> + <td><h5>LATEST</h5></td> + <td>No</td> + <td>LATEST scan mode only reads incremental data from the latest offset.</td> + </tr> + </tbody> +</table> + +## Architecture + +Flink Table Store consists of two parts, LogStore and FileStore. The +LogStore would serve the need of message systems, while FileStore will +play the role of file systems with columnar formats. At each point in time, +LogStore and FileStore will store exactly the same data for the latest +written data (LogStore has TTL), but with different physical layouts. +Flink Table Store aims to bridge the storage layout gap between the +batch table and streaming changelog, to provide a unified experience +as Flink SQL: +- LogStore: Store the latest data, support second level streaming incremental +consumption, use Kafka by default. +- FileStore: Store latest data + historical data, provide batch Ad-Hoc analysis. + +{{< img src="/img/architecture.svg" alt="Flink Table Store Architecture" >}} + +The manifest file is used to record changes to the SST file, and multiple +manifest files make up a snapshot. + +The data in the FileStore is divided into buckets, each bucket is a +separate LSM (log structured merge tree). + +The file inside LSM is called SST (Sorted Strings Table). By default, files +are stored in columnar format (Apache ORC) for high performance of analysis +and compression of storage. + diff --git a/docs/static/img/architecture.svg b/docs/static/img/architecture.svg new file mode 100644 index 0000000..d6351f9 --- /dev/null +++ b/docs/static/img/architecture.svg @@ -0,0 +1 @@ +<svg version="1.1" viewBox="0.0 0.0 960.0 540.0" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg"><clipPath id="g120690087d7_0_0.0"><path d="m0 0l960.0 0l0 540.0l-960.0 0l0 -540.0z" clip-rule="nonzero"/></clipPath><g clip-path="url(#g120690087d7_0_0.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 540.0l-960.0 0z" fill-rule="evenodd"/><g filter="url(#shadowFilter-g120690087d7_0_0.1)"><use xlin [...] \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3f21f49..4dbcc41 100644 --- a/pom.xml +++ b/pom.xml @@ -339,6 +339,7 @@ under the License. <exclude>docs/themes/book/**</exclude> <exclude>docs/assets/github.css</exclude> <exclude>docs/static/js/anchor.min.js</exclude> + <exclude>**/*.svg</exclude> <!-- Bundled license files --> <exclude>**/LICENSE*</exclude> <!-- artifacts created during release process -->
