This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 42799f7 [FLINK-26899] Introduce write/query table document for table
store
42799f7 is described below
commit 42799f71ba226342a1a603002cf4b9b940d85b88
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 4 15:09:32 2022 +0800
[FLINK-26899] Introduce write/query table document for table store
This closes #72
---
docs/content/docs/development/create-table.md | 84 +++++++++-
docs/content/docs/development/distribution.md | 107 ------------
docs/content/docs/development/overview.md | 19 +++
docs/content/docs/development/query-table.md | 108 ++++++++++++
docs/content/docs/development/write-table.md | 204 +++++++++++++++++++++++
docs/content/docs/try-table-store/quick-start.md | 2 +-
docs/static/img/stream_batch_insert.svg | 1 +
7 files changed, 416 insertions(+), 109 deletions(-)
diff --git a/docs/content/docs/development/create-table.md
b/docs/content/docs/development/create-table.md
index 01185f6..106afe3 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
under the License.
-->
-# CREATE statement
+# Create Table
```sql
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
@@ -151,3 +151,85 @@ Creating a table will create the corresponding physical
storage:
- If `log.system` is configured as Kafka, a Topic named
"${catalog_name}.${database_name}.${table_name}" will be created
automatically when the table is created.
+
+## Distribution
+
+The data distribution of Table Store consists of three concepts:
+Partition, Bucket, and Primary Key.
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ PRIMARY KEY (dt, user_id) NOT ENFORCED
+) PARTITIONED BY (dt) WITH (
+ 'bucket' = '4'
+);
+```
+
+For example, the `MyTable` table above has its data distribution
+in the following order:
+- Partition: isolating different data based on partition fields.
+- Bucket: Within a single partition, distributed into 4 different
+ buckets based on the hash value of the primary key.
+- Primary key: Within a single bucket, sorted by primary key to
+ build LSM structure.
+
+## Partition
+
+Table Store adopts the same partitioning concept as Apache Hive to
+separate data, and thus various operations can be managed by partition
+as a management unit.
+
+Partitioned filtering is the most effective way to improve performance,
+your query statements should contain partition filtering conditions
+as much as possible.
+
+## Bucket
+
+The record is hashed into different buckets according to the
+primary key or the whole row (without primary key).
+
+The number of buckets is very important as it determines the
+worst-case maximum processing parallelism. But it should not be
+too big, otherwise, the system will create a lot of small files.
+
+In general, the desired file size is 128 MB, the recommended data
+to be kept on disk in each sub-bucket is about 1 GB.
+
+## Primary Key
+
+The primary key is unique and indexed.
+
+Flink Table Store imposes an ordering of data, which means the system
+will sort the primary key within each bucket. All fields will be used
+to sort if no primary key is defined. Using this feature, you can
+achieve high performance by adding filter conditions on the primary key.
+
+The primary key's choice is critical, especially when setting the composite
+primary key. A rule of thumb is to put the most frequently queried field in
+the front. For example:
+
+```sql
+CREATE TABLE MyTable (
+ catalog_id BIGINT,
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ ......
+);
+```
+
+For this table, assuming that the composite primary keys are
+the `catalog_id` and `user_id` fields, there are two ways to
+set the primary key:
+1. PRIMARY KEY (user_id, catalog_id)
+2. PRIMARY KEY (catalog_id, user_id)
+
+The two methods do not behave in the same way when querying.
+Use approach one if you have a large number of filtered queries
+with only `user_id`, and use approach two if you have a large
+number of filtered queries with only `catalog_id`.
diff --git a/docs/content/docs/development/distribution.md
b/docs/content/docs/development/distribution.md
deleted file mode 100644
index c09901f..0000000
--- a/docs/content/docs/development/distribution.md
+++ /dev/null
@@ -1,107 +0,0 @@
----
-title: "Distribution"
-weight: 3
-type: docs
-aliases:
-- /development/distribution.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Distribution
-
-The data distribution of Table Store consists of three concepts:
-Partition, Bucket, and Primary Key.
-
-```sql
-CREATE TABLE MyTable (
- user_id BIGINT,
- item_id BIGINT,
- behavior STRING,
- dt STRING,
- PRIMARY KEY (dt, user_id) NOT ENFORCED
-) PARTITION BY (dt) WITH (
- 'bucket' = '4'
-);
-```
-
-For example, the `MyTable` table above has its data distribution
-in the following order:
-- Partition: isolating different data based on partition fields.
-- Bucket: Within a single partition, distributed into 4 different
- buckets based on the hash value of the primary key.
-- Primary key: Within a single bucket, sorted by primary key to
- build LSM structure.
-
-## Partition
-
-Table Store adopts the same partitioning concept as Apache Hive to
-separate data, and thus various operations can be managed by partition
-as a management unit.
-
-Partitioned filtering is the most effective way to improve performance,
-your query statements should contain partition filtering conditions
-as much as possible.
-
-## Bucket
-
-The record is hashed into different buckets according to the
-primary key or the whole row (without primary key).
-
-The number of buckets is very important as it determines the
-worst-case maximum processing parallelism. But it should not be
-too big, otherwise, the system will create a lot of small files.
-
-In general, the desired file size is 128 MB, the recommended data
-to be kept on disk in each sub-bucket is about 1 GB.
-
-## Primary Key
-
-The primary key is unique and is indexed.
-
-Flink Table Store imposes an ordering of data, which means the system
-will sort the primary key within each bucket. All fields will be used
-to sort if no primary key is defined. Using this feature, you can
-achieve high performance by adding filter conditions on the primary key.
-
-The primary key's choice is critical, especially when setting the composite
-primary key. A rule of thumb is to put the most frequently queried field in
-the front. For example:
-
-```sql
-CREATE TABLE MyTable (
- catalog_id BIGINT,
- user_id BIGINT,
- item_id BIGINT,
- behavior STRING,
- dt STRING,
- ......
-);
-```
-
-For this table, assuming that the composite primary keys are
-the `catalog_id` and `user_id` fields, there are two ways to
-set the primary key:
-1. PRIMARY KEY (user_id, catalog_id)
-2. PRIMARY KEY (catalog_id, user_id)
-
-The two methods do not behave in the same way when querying.
-Use approach 1 if you have a large number of filtered queries
-with only `user_id`, and use approach 2 if you have a large
-number of filtered queries with only `catalog_id`.
diff --git a/docs/content/docs/development/overview.md
b/docs/content/docs/development/overview.md
index ffbef5e..43a6fc2 100644
--- a/docs/content/docs/development/overview.md
+++ b/docs/content/docs/development/overview.md
@@ -30,6 +30,25 @@ Flink Table Store is a unified streaming and batch store for
building dynamic
tables on Apache Flink. Flink Table Store serves as the storage engine behind
Flink SQL Managed Table.
+## Setup Table Store
+
+{{< hint info >}}
+__Note:__ Table Store is only supported since Flink 1.15.
+{{< /hint >}}
+
+You can get the bundle jar for the Table Store in one of the following ways:
+- [Download the latest bundle jar](https://flink.apache.org/downloads.html) of
+ Flink Table Store.
+- Build bundle jar under submodule `flink-table-store-dist` from source code.
+
+Flink Table Store has shaded all the dependencies in the package, so you don't
have
+to worry about conflicts with other connector dependencies.
+
+The steps to set up are:
+- Copy the Table Store bundle jar to `flink/lib`.
+- Setting the HADOOP_CLASSPATH environment variable or copy the
+ [Pre-bundled Hadoop Jar](https://flink.apache.org/downloads.html) to
`flink/lib`.
+
## Managed Table
The typical usage of Flink SQL DDL is to specify the 'connector' and fill in
diff --git a/docs/content/docs/development/query-table.md
b/docs/content/docs/development/query-table.md
new file mode 100644
index 0000000..78e6564
--- /dev/null
+++ b/docs/content/docs/development/query-table.md
@@ -0,0 +1,108 @@
+---
+title: "Query Table"
+weight: 4
+type: docs
+aliases:
+- /development/query-table.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Query Table
+
+The Table Store is streaming batch unified, you can read full
+and incremental data depending on the runtime execution mode:
+
+```sql
+-- Batch mode, read latest snapshot
+SET 'execution.runtime-mode' = 'batch';
+SELECT * FROM MyTable;
+
+-- Streaming mode, read incremental snapshot, read the snapshot first, then
read the incremental
+SET 'execution.runtime-mode' = 'streaming';
+SELECT * FROM MyTable;
+
+-- Streaming mode, read latest incremental
+SET 'execution.runtime-mode' = 'streaming';
+SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;
+```
+
+## Query Optimization
+
+It is highly recommended to take partition and primary key filters
+along with the query, which will speed up the data skipping of the query.
+
+Supported filter functions are:
+- `=`
+- `<>`
+- `<`
+- `<=`
+- `>`
+- `>=`
+- `in`
+- starts with `like`
+
+## Streaming Real-time
+
+By default, data is only visible after the checkpoint, which means
+that the streaming reading has transactional consistency.
+
+If you want the data to be immediately visible, you need to set the following
options:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Table Option</th>
+ <th class="text-center" style="width: 5%">Default</th>
+ <th class="text-center" style="width: 60%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>`log.system` = `kafka`</h5></td>
+ <td>No</td>
+ <td>You need to enable log system because the FileStore's continuous
consumption only provides checkpoint-based visibility.</td>
+ </tr>
+ <tr>
+ <td><h5>`log.consistency` = `eventual`</h5></td>
+ <td>No</td>
+ <td>This means that writes are visible without using LogSystem's
transaction mechanism.</td>
+ </tr>
+ </tbody>
+</table>
+
+Note: All tables need to have the primary key defined because only then can the
+data be de-duplicated by the normalizing node of the downstream job.
+
+## Streaming Low Cost
+
+By default, for the table with the primary key, the records in the table store
only
+contain `INSERT`, `UPDATE_AFTER`, and `DELETE`. The downstream consuming job
will
+generate a normalized node, and it stores all processed key-value to produce
the
+`UPDATE_BEFORE` message, which will bring extra overhead.
+
+If you want to remove downstream normalized node (It's costly) or see the all
+changes of this table, you can configure:
+- 'log.changelog-mode' = 'all'
+- 'log.consistency' = 'transactional' (default)
+
+The inserted query written to the table store must contain all message types
with
+`UPDATE_BEFORE`, otherwise the planner will throw an exception. It means that
Planner
+expects the inserted query to produce a real changelog, otherwise the data
would
+be wrong.
diff --git a/docs/content/docs/development/write-table.md
b/docs/content/docs/development/write-table.md
new file mode 100644
index 0000000..7b7c7ad
--- /dev/null
+++ b/docs/content/docs/development/write-table.md
@@ -0,0 +1,204 @@
+---
+title: "Write Table"
+weight: 3
+type: docs
+aliases:
+- /development/write-table.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Write Table
+
+```sql
+INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name
+ [PARTITION part_spec] [column_list] select_statement
+
+part_spec:
+ (part_col_name1=val1 [, part_col_name2=val2, ...])
+
+column_list:
+ (col_name1 [, column_name2, ...])
+```
+
+## Unify Streaming and Batch
+
+Flink Table Store supports read/write under both batch and streaming mode.
+Beyond that, it can also write to the same managed table simultaneously by
+different streaming and batch jobs.
+
+Suppose you have a warehouse pipeline:
+
+{{< img src="/img/stream_batch_insert.svg" alt="Unify Streaming and Batch" >}}
+
+The DWD layer has a following table:
+
+```sql
+-- a managed table ddl
+CREATE TABLE MyDwdTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ dt STRING
+) PARTITIONED BY (dt);
+```
+
+And there is a real-time pipeline to perform the data sync task, followed
+by the downstream jobs to perform the rest ETL steps.
+
+```sql
+-- Run a streaming job that continuously writes to the table
+SET 'execution.runtime-mode' = 'streaming';
+INSERT INTO MyDwdTable SELECT user_id, item_id, dt FROM MyCdcTable WHERE
some_filter;
+
+-- The downstream aggregation task
+INSERT INTO MyDwsTable
+SELECT dt, item_id, COUNT(user_id) FROM MyDwdTable GROUP BY dt, item_id;
+```
+
+Some backfill tasks are often required to correct historical data, which means
+you can start a new batch job overwriting the table's historical partition
+without influencing the current streaming pipeline and the downstream tasks.
+
+```sql
+-- Run a batch job to revise yesterday's partition
+SET 'execution.runtime-mode' = 'batch';
+INSERT OVERWRITE MyDwdTable PARTITION ('dt'='20220402')
+SELECT user_id, item_id FROM MyCdcTable WHERE dt = '20220402' AND new_filter;
+```
+
+This way you revise yesterday's partition without suspending the streaming job.
+
+{{< hint info >}}
+__Note:__ Multiple jobs writing to a single partition at the same time is
+not supported. The behavior does not result in data errors, but can lead
+to job failover.
+{{< /hint >}}
+
+## Parallelism
+
+It is recommended that the parallelism of sink should be less than or
+equal to the number of buckets, preferably equal. You can control the
+parallelism of the sink with the `sink.parallelism` option.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Option</th>
+ <th class="text-center" style="width: 5%">Required</th>
+ <th class="text-center" style="width: 5%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 60%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines the parallelism of the sink operator. By default, the
parallelism is determined by the framework using the same parallelism of the
upstream chained operator.</td>
+ </tr>
+ </tbody>
+</table>
+
+## Expiring Snapshot
+
+Table Store generates one or two snapshots per commit. To avoid too many
snapshots
+that create a lot of small files and redundant storage, Table Store writes
defaults
+to eliminate expired snapshots:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Option</th>
+ <th class="text-center" style="width: 5%">Required</th>
+ <th class="text-center" style="width: 5%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 60%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>snapshot.time-retained</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The maximum time of completed snapshots to retain.</td>
+ </tr>
+ <tr>
+ <td><h5>snapshot.num-retained</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">Integer.MAX_VALUE</td>
+ <td>Integer</td>
+ <td>The maximum number of completed snapshots to retain.</td>
+ </tr>
+ </tbody>
+</table>
+
+Please note that too short retain time or too small retain number may result
in:
+- Batch query cannot find the file. For example, the table is relatively large
and
+ the batch query takes 10 minutes to read, but the snapshot from 10 minutes
ago
+ expires, at which point the batch query will read a deleted snapshot.
+- Continuous reading jobs on FileStore (Without Log System) fail to restart.
At the
+ time of the job failover, the snapshot it recorded has expired.
+
+## Performance
+
+Table Store uses LSM data structure, which itself has the ability to support a
large
+number of updates. Update performance and query performance is a tradeoff, the
+following parameters control this tradeoff:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Option</th>
+ <th class="text-center" style="width: 5%">Required</th>
+ <th class="text-center" style="width: 5%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 60%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>num-sorted-run.max</h5></td>
+ <td>No</td>
+ <td style="word-wrap: break-word;">5</td>
+ <td>Integer</td>
+ <td>The max sorted run number. Includes level0 files (one file one
sorted run) and high-level runs (one level one sorted run).</td>
+ </tr>
+ </tbody>
+</table>
+
+- The larger `num-sorted-run.max`, the less merge cost when updating data,
which
+ can avoid many invalid merges. However, if this value is too large, more
memory
+ will be needed when merging files because each FileReader will take up a lot
of
+ memory.
+
+- The smaller `num-sorted-run.max`, the better performance when querying, fewer
+ files will be merged.
+
+## Memory
+
+There are three main places in the Table Store's sink writer that take up
memory:
+- MemTable's write buffer, which is individually occupied by each partition,
each
+ bucket, and this memory value can be adjustable by the `write-buffer-size`
+ option (default 64 MB).
+- The memory consumed by compaction for reading files, it can be adjusted by
the
+ `num-sorted-run.max` option to change the maximum number of files to be
merged.
+- The memory consumed by writing file, which is not adjustable.
diff --git a/docs/content/docs/try-table-store/quick-start.md
b/docs/content/docs/try-table-store/quick-start.md
index 6c55b8c..db7ca2a 100644
--- a/docs/content/docs/try-table-store/quick-start.md
+++ b/docs/content/docs/try-table-store/quick-start.md
@@ -32,7 +32,7 @@ document will be guided to create a simple dynamic table to
read and write it.
## Step 1: Downloading Flink
{{< hint info >}}
-__Note:__ Table Store is only supported starting from Flink 1.15.
+__Note:__ Table Store is only supported since Flink 1.15.
{{< /hint >}}
[Download the latest binary release](https://flink.apache.org/downloads.html)
of Flink,
diff --git a/docs/static/img/stream_batch_insert.svg
b/docs/static/img/stream_batch_insert.svg
new file mode 100644
index 0000000..1a13e25
--- /dev/null
+++ b/docs/static/img/stream_batch_insert.svg
@@ -0,0 +1 @@
+<svg version="1.1" viewBox="0.0 0.0 960.0 388.9133858267717" fill="none"
stroke="none" stroke-linecap="square" stroke-miterlimit="10"
xmlns:xlink="http://www.w3.org/1999/xlink"
xmlns="http://www.w3.org/2000/svg"><clipPath id="g122f03a17ab_0_1.0"><path
d="m0 0l960.0 0l0 388.9134l-960.0 0l0 -388.9134z"
clip-rule="nonzero"/></clipPath><g clip-path="url(#g122f03a17ab_0_1.0)"><path
fill="#ffffff" d="m0 0l960.0 0l0 388.9134l-960.0 0z" fill-rule="evenodd"/><path
fill="#000000" fill-opacity="0.0 [...]
\ No newline at end of file