This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 631851f4124a0bc6696d934e72ff58762e479acd Author: Jingsong Lee <[email protected]> AuthorDate: Mon Apr 4 15:09:32 2022 +0800 [FLINK-26899] Introduce write/query table document for table store This closes #72 --- docs/content/docs/development/create-table.md | 84 +++++++++- docs/content/docs/development/distribution.md | 107 ------------ docs/content/docs/development/overview.md | 19 +++ docs/content/docs/development/query-table.md | 108 ++++++++++++ docs/content/docs/development/write-table.md | 204 +++++++++++++++++++++++ docs/content/docs/try-table-store/quick-start.md | 2 +- docs/static/img/stream_batch_insert.svg | 1 + 7 files changed, 416 insertions(+), 109 deletions(-) diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md index 01185f6..106afe3 100644 --- a/docs/content/docs/development/create-table.md +++ b/docs/content/docs/development/create-table.md @@ -24,7 +24,7 @@ specific language governing permissions and limitations under the License. --> -# CREATE statement +# Create Table ```sql CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name @@ -151,3 +151,85 @@ Creating a table will create the corresponding physical storage: - If `log.system` is configured as Kafka, a Topic named "${catalog_name}.${database_name}.${table_name}" will be created automatically when the table is created. + +## Distribution + +The data distribution of Table Store consists of three concepts: +Partition, Bucket, and Primary Key. + +```sql +CREATE TABLE MyTable ( + user_id BIGINT, + item_id BIGINT, + behavior STRING, + dt STRING, + PRIMARY KEY (dt, user_id) NOT ENFORCED +) PARTITIONED BY (dt) WITH ( + 'bucket' = '4' +); +``` + +For example, the `MyTable` table above has its data distribution +in the following order: +- Partition: isolating different data based on partition fields. +- Bucket: Within a single partition, distributed into 4 different + buckets based on the hash value of the primary key. +- Primary key: Within a single bucket, sorted by primary key to + build LSM structure. + +## Partition + +Table Store adopts the same partitioning concept as Apache Hive to +separate data, and thus various operations can be managed by partition +as a management unit. + +Partitioned filtering is the most effective way to improve performance, +your query statements should contain partition filtering conditions +as much as possible. + +## Bucket + +The record is hashed into different buckets according to the +primary key or the whole row (without primary key). + +The number of buckets is very important as it determines the +worst-case maximum processing parallelism. But it should not be +too big, otherwise, the system will create a lot of small files. + +In general, the desired file size is 128 MB, the recommended data +to be kept on disk in each sub-bucket is about 1 GB. + +## Primary Key + +The primary key is unique and indexed. + +Flink Table Store imposes an ordering of data, which means the system +will sort the primary key within each bucket. All fields will be used +to sort if no primary key is defined. Using this feature, you can +achieve high performance by adding filter conditions on the primary key. + +The primary key's choice is critical, especially when setting the composite +primary key. A rule of thumb is to put the most frequently queried field in +the front. For example: + +```sql +CREATE TABLE MyTable ( + catalog_id BIGINT, + user_id BIGINT, + item_id BIGINT, + behavior STRING, + dt STRING, + ...... +); +``` + +For this table, assuming that the composite primary keys are +the `catalog_id` and `user_id` fields, there are two ways to +set the primary key: +1. PRIMARY KEY (user_id, catalog_id) +2. PRIMARY KEY (catalog_id, user_id) + +The two methods do not behave in the same way when querying. +Use approach one if you have a large number of filtered queries +with only `user_id`, and use approach two if you have a large +number of filtered queries with only `catalog_id`. diff --git a/docs/content/docs/development/distribution.md b/docs/content/docs/development/distribution.md deleted file mode 100644 index c09901f..0000000 --- a/docs/content/docs/development/distribution.md +++ /dev/null @@ -1,107 +0,0 @@ ---- -title: "Distribution" -weight: 3 -type: docs -aliases: -- /development/distribution.html ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -# Distribution - -The data distribution of Table Store consists of three concepts: -Partition, Bucket, and Primary Key. - -```sql -CREATE TABLE MyTable ( - user_id BIGINT, - item_id BIGINT, - behavior STRING, - dt STRING, - PRIMARY KEY (dt, user_id) NOT ENFORCED -) PARTITION BY (dt) WITH ( - 'bucket' = '4' -); -``` - -For example, the `MyTable` table above has its data distribution -in the following order: -- Partition: isolating different data based on partition fields. -- Bucket: Within a single partition, distributed into 4 different - buckets based on the hash value of the primary key. -- Primary key: Within a single bucket, sorted by primary key to - build LSM structure. - -## Partition - -Table Store adopts the same partitioning concept as Apache Hive to -separate data, and thus various operations can be managed by partition -as a management unit. - -Partitioned filtering is the most effective way to improve performance, -your query statements should contain partition filtering conditions -as much as possible. - -## Bucket - -The record is hashed into different buckets according to the -primary key or the whole row (without primary key). - -The number of buckets is very important as it determines the -worst-case maximum processing parallelism. But it should not be -too big, otherwise, the system will create a lot of small files. - -In general, the desired file size is 128 MB, the recommended data -to be kept on disk in each sub-bucket is about 1 GB. - -## Primary Key - -The primary key is unique and is indexed. - -Flink Table Store imposes an ordering of data, which means the system -will sort the primary key within each bucket. All fields will be used -to sort if no primary key is defined. Using this feature, you can -achieve high performance by adding filter conditions on the primary key. - -The primary key's choice is critical, especially when setting the composite -primary key. A rule of thumb is to put the most frequently queried field in -the front. For example: - -```sql -CREATE TABLE MyTable ( - catalog_id BIGINT, - user_id BIGINT, - item_id BIGINT, - behavior STRING, - dt STRING, - ...... -); -``` - -For this table, assuming that the composite primary keys are -the `catalog_id` and `user_id` fields, there are two ways to -set the primary key: -1. PRIMARY KEY (user_id, catalog_id) -2. PRIMARY KEY (catalog_id, user_id) - -The two methods do not behave in the same way when querying. -Use approach 1 if you have a large number of filtered queries -with only `user_id`, and use approach 2 if you have a large -number of filtered queries with only `catalog_id`. diff --git a/docs/content/docs/development/overview.md b/docs/content/docs/development/overview.md index ffbef5e..43a6fc2 100644 --- a/docs/content/docs/development/overview.md +++ b/docs/content/docs/development/overview.md @@ -30,6 +30,25 @@ Flink Table Store is a unified streaming and batch store for building dynamic tables on Apache Flink. Flink Table Store serves as the storage engine behind Flink SQL Managed Table. +## Setup Table Store + +{{< hint info >}} +__Note:__ Table Store is only supported since Flink 1.15. +{{< /hint >}} + +You can get the bundle jar for the Table Store in one of the following ways: +- [Download the latest bundle jar](https://flink.apache.org/downloads.html) of + Flink Table Store. +- Build bundle jar under submodule `flink-table-store-dist` from source code. + +Flink Table Store has shaded all the dependencies in the package, so you don't have +to worry about conflicts with other connector dependencies. + +The steps to set up are: +- Copy the Table Store bundle jar to `flink/lib`. +- Setting the HADOOP_CLASSPATH environment variable or copy the + [Pre-bundled Hadoop Jar](https://flink.apache.org/downloads.html) to `flink/lib`. + ## Managed Table The typical usage of Flink SQL DDL is to specify the 'connector' and fill in diff --git a/docs/content/docs/development/query-table.md b/docs/content/docs/development/query-table.md new file mode 100644 index 0000000..78e6564 --- /dev/null +++ b/docs/content/docs/development/query-table.md @@ -0,0 +1,108 @@ +--- +title: "Query Table" +weight: 4 +type: docs +aliases: +- /development/query-table.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Query Table + +The Table Store is streaming batch unified, you can read full +and incremental data depending on the runtime execution mode: + +```sql +-- Batch mode, read latest snapshot +SET 'execution.runtime-mode' = 'batch'; +SELECT * FROM MyTable; + +-- Streaming mode, read incremental snapshot, read the snapshot first, then read the incremental +SET 'execution.runtime-mode' = 'streaming'; +SELECT * FROM MyTable; + +-- Streaming mode, read latest incremental +SET 'execution.runtime-mode' = 'streaming'; +SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */; +``` + +## Query Optimization + +It is highly recommended to take partition and primary key filters +along with the query, which will speed up the data skipping of the query. + +Supported filter functions are: +- `=` +- `<>` +- `<` +- `<=` +- `>` +- `>=` +- `in` +- starts with `like` + +## Streaming Real-time + +By default, data is only visible after the checkpoint, which means +that the streaming reading has transactional consistency. + +If you want the data to be immediately visible, you need to set the following options: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Table Option</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>`log.system` = `kafka`</h5></td> + <td>No</td> + <td>You need to enable log system because the FileStore's continuous consumption only provides checkpoint-based visibility.</td> + </tr> + <tr> + <td><h5>`log.consistency` = `eventual`</h5></td> + <td>No</td> + <td>This means that writes are visible without using LogSystem's transaction mechanism.</td> + </tr> + </tbody> +</table> + +Note: All tables need to have the primary key defined because only then can the +data be de-duplicated by the normalizing node of the downstream job. + +## Streaming Low Cost + +By default, for the table with the primary key, the records in the table store only +contain `INSERT`, `UPDATE_AFTER`, and `DELETE`. The downstream consuming job will +generate a normalized node, and it stores all processed key-value to produce the +`UPDATE_BEFORE` message, which will bring extra overhead. + +If you want to remove downstream normalized node (It's costly) or see the all +changes of this table, you can configure: +- 'log.changelog-mode' = 'all' +- 'log.consistency' = 'transactional' (default) + +The inserted query written to the table store must contain all message types with +`UPDATE_BEFORE`, otherwise the planner will throw an exception. It means that Planner +expects the inserted query to produce a real changelog, otherwise the data would +be wrong. diff --git a/docs/content/docs/development/write-table.md b/docs/content/docs/development/write-table.md new file mode 100644 index 0000000..7b7c7ad --- /dev/null +++ b/docs/content/docs/development/write-table.md @@ -0,0 +1,204 @@ +--- +title: "Write Table" +weight: 3 +type: docs +aliases: +- /development/write-table.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Write Table + +```sql +INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name + [PARTITION part_spec] [column_list] select_statement + +part_spec: + (part_col_name1=val1 [, part_col_name2=val2, ...]) + +column_list: + (col_name1 [, column_name2, ...]) +``` + +## Unify Streaming and Batch + +Flink Table Store supports read/write under both batch and streaming mode. +Beyond that, it can also write to the same managed table simultaneously by +different streaming and batch jobs. + +Suppose you have a warehouse pipeline: + +{{< img src="/img/stream_batch_insert.svg" alt="Unify Streaming and Batch" >}} + +The DWD layer has a following table: + +```sql +-- a managed table ddl +CREATE TABLE MyDwdTable ( + user_id BIGINT, + item_id BIGINT, + dt STRING +) PARTITIONED BY (dt); +``` + +And there is a real-time pipeline to perform the data sync task, followed +by the downstream jobs to perform the rest ETL steps. + +```sql +-- Run a streaming job that continuously writes to the table +SET 'execution.runtime-mode' = 'streaming'; +INSERT INTO MyDwdTable SELECT user_id, item_id, dt FROM MyCdcTable WHERE some_filter; + +-- The downstream aggregation task +INSERT INTO MyDwsTable +SELECT dt, item_id, COUNT(user_id) FROM MyDwdTable GROUP BY dt, item_id; +``` + +Some backfill tasks are often required to correct historical data, which means +you can start a new batch job overwriting the table's historical partition +without influencing the current streaming pipeline and the downstream tasks. + +```sql +-- Run a batch job to revise yesterday's partition +SET 'execution.runtime-mode' = 'batch'; +INSERT OVERWRITE MyDwdTable PARTITION ('dt'='20220402') +SELECT user_id, item_id FROM MyCdcTable WHERE dt = '20220402' AND new_filter; +``` + +This way you revise yesterday's partition without suspending the streaming job. + +{{< hint info >}} +__Note:__ Multiple jobs writing to a single partition at the same time is +not supported. The behavior does not result in data errors, but can lead +to job failover. +{{< /hint >}} + +## Parallelism + +It is recommended that the parallelism of sink should be less than or +equal to the number of buckets, preferably equal. You can control the +parallelism of the sink with the `sink.parallelism` option. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-center" style="width: 5%">Required</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.parallelism</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Integer</td> + <td>Defines the parallelism of the sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> + </tr> + </tbody> +</table> + +## Expiring Snapshot + +Table Store generates one or two snapshots per commit. To avoid too many snapshots +that create a lot of small files and redundant storage, Table Store writes defaults +to eliminate expired snapshots: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-center" style="width: 5%">Required</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>snapshot.time-retained</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">1 h</td> + <td>Duration</td> + <td>The maximum time of completed snapshots to retain.</td> + </tr> + <tr> + <td><h5>snapshot.num-retained</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">Integer.MAX_VALUE</td> + <td>Integer</td> + <td>The maximum number of completed snapshots to retain.</td> + </tr> + </tbody> +</table> + +Please note that too short retain time or too small retain number may result in: +- Batch query cannot find the file. For example, the table is relatively large and + the batch query takes 10 minutes to read, but the snapshot from 10 minutes ago + expires, at which point the batch query will read a deleted snapshot. +- Continuous reading jobs on FileStore (Without Log System) fail to restart. At the + time of the job failover, the snapshot it recorded has expired. + +## Performance + +Table Store uses LSM data structure, which itself has the ability to support a large +number of updates. Update performance and query performance is a tradeoff, the +following parameters control this tradeoff: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Option</th> + <th class="text-center" style="width: 5%">Required</th> + <th class="text-center" style="width: 5%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 60%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>num-sorted-run.max</h5></td> + <td>No</td> + <td style="word-wrap: break-word;">5</td> + <td>Integer</td> + <td>The max sorted run number. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).</td> + </tr> + </tbody> +</table> + +- The larger `num-sorted-run.max`, the less merge cost when updating data, which + can avoid many invalid merges. However, if this value is too large, more memory + will be needed when merging files because each FileReader will take up a lot of + memory. + +- The smaller `num-sorted-run.max`, the better performance when querying, fewer + files will be merged. + +## Memory + +There are three main places in the Table Store's sink writer that take up memory: +- MemTable's write buffer, which is individually occupied by each partition, each + bucket, and this memory value can be adjustable by the `write-buffer-size` + option (default 64 MB). +- The memory consumed by compaction for reading files, it can be adjusted by the + `num-sorted-run.max` option to change the maximum number of files to be merged. +- The memory consumed by writing file, which is not adjustable. diff --git a/docs/content/docs/try-table-store/quick-start.md b/docs/content/docs/try-table-store/quick-start.md index 6c55b8c..db7ca2a 100644 --- a/docs/content/docs/try-table-store/quick-start.md +++ b/docs/content/docs/try-table-store/quick-start.md @@ -32,7 +32,7 @@ document will be guided to create a simple dynamic table to read and write it. ## Step 1: Downloading Flink {{< hint info >}} -__Note:__ Table Store is only supported starting from Flink 1.15. +__Note:__ Table Store is only supported since Flink 1.15. {{< /hint >}} [Download the latest binary release](https://flink.apache.org/downloads.html) of Flink, diff --git a/docs/static/img/stream_batch_insert.svg b/docs/static/img/stream_batch_insert.svg new file mode 100644 index 0000000..1a13e25 --- /dev/null +++ b/docs/static/img/stream_batch_insert.svg @@ -0,0 +1 @@ +<svg version="1.1" viewBox="0.0 0.0 960.0 388.9133858267717" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg"><clipPath id="g122f03a17ab_0_1.0"><path d="m0 0l960.0 0l0 388.9134l-960.0 0l0 -388.9134z" clip-rule="nonzero"/></clipPath><g clip-path="url(#g122f03a17ab_0_1.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 388.9134l-960.0 0z" fill-rule="evenodd"/><path fill="#000000" fill-opacity="0.0 [...] \ No newline at end of file
