This is an automated email from the ASF dual-hosted git repository.

bhavanisudha pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new cdcc927  organized the Flink details to move into the relevant pages 
and make the quick-start short and simple (#4257)
cdcc927 is described below

commit cdcc927f4fcc7ea0cc3f4b51e37adf12c6deaed6
Author: Kyle Weller <[email protected]>
AuthorDate: Thu Dec 9 16:25:45 2021 -0800

    organized the Flink details to move into the relevant pages and make the 
quick-start short and simple (#4257)
---
 website/docs/compaction.md              |  67 ++++--
 website/docs/flink-quick-start-guide.md | 415 +-------------------------------
 website/docs/flink_configuration.md     | 117 +++++++++
 website/docs/hoodie_deltastreamer.md    | 145 ++++++++++-
 website/docs/query_engine_setup.md      |  19 +-
 website/docs/syncing_metastore.md       | 100 +++++++-
 website/sidebars.js                     |   1 +
 7 files changed, 433 insertions(+), 431 deletions(-)

diff --git a/website/docs/compaction.md b/website/docs/compaction.md
index 70fba50..015d21e 100644
--- a/website/docs/compaction.md
+++ b/website/docs/compaction.md
@@ -5,18 +5,13 @@ toc: true
 last_modified_at:
 ---
 
-Compaction is executed asynchronously with Hudi by default.
-
 ## Async Compaction
-
-Async Compaction is performed in 2 steps:
+Compaction is executed asynchronously with Hudi by default. Async Compaction 
is performed in 2 steps:
 
 1. ***Compaction Scheduling***: This is done by the ingestion job. In this 
step, Hudi scans the partitions and selects **file
    slices** to be compacted. A compaction plan is finally written to Hudi 
timeline.
 1. ***Compaction Execution***: A separate process reads the compaction plan 
and performs compaction of file slices.
 
-## Deployment Models
-
 There are few ways by which we can execute compactions asynchronously.
 
 ### Spark Structured Streaming
@@ -68,6 +63,37 @@ spark-submit --packages 
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
 --continous
 ```
 
+## Synchronous Compaction
+By default, compaction is run asynchronously.
+
+If latency of ingesting records is important for you, you are most likely 
using Merge-On-Read tables.
+Merge-On-Read tables store data using a combination of columnar (e.g parquet) 
+ row based (e.g avro) file formats.
+Updates are logged to delta files & later compacted to produce new versions of 
columnar files. 
+To improve ingestion latency, Async Compaction is the default configuration.
+
+If immediate read performance of a new commit is important for you, or you 
want simplicity of not managing separate compaction jobs,
+you may want Synchronous compaction, which means that as a commit is written 
it is also compacted by the same job.
+
+Compaction is run synchronously by passing the flag "--disable-compaction" 
(Meaning to disable async compaction scheduling).
+When both ingestion and compaction is running in the same spark context, you 
can use resource allocation configuration 
+in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight",
+"--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and 
"--compact-scheduling-minshare")
+to control executor allocation between ingestion and compaction.
+
+
+## Offline Compaction
+
+The compaction of the MERGE_ON_READ table is enabled by default. The trigger 
strategy is to perform compaction after completing
+five commits. Because compaction consumes a lot of memory and is placed in the 
same pipeline with the write operation, it's easy to
+interfere with the write operation when there is a large amount of data (> 
100000 per second). As this time, it is more stable to execute
+the compaction task by using offline compaction.
+
+:::note
+The execution of a compaction task includes two parts: schedule compaction 
plan and execute compaction plan. It's recommended that
+the process of schedule compaction plan be triggered periodically by the write 
task, and the write parameter `compaction.schedule.enable`
+is enabled by default.
+:::
+
 ### Hudi Compactor Utility
 Hudi provides a standalone tool to execute specific compactions 
asynchronously. Below is an example and you can read more in the [deployment 
guide](/docs/deployment#compactions)
 
@@ -81,7 +107,7 @@ spark-submit --packages 
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
 --instant-time <compaction_instant>
 ```
 
-Note, the `instant-time` parameter is now optional for the Hudi Compactor 
Utility. If using the utility without `--instant time`, 
+Note, the `instant-time` parameter is now optional for the Hudi Compactor 
Utility. If using the utility without `--instant time`,
 the spark-submit will execute the earliest scheduled compaction on the Hudi 
timeline.
 
 ### Hudi CLI
@@ -93,19 +119,20 @@ hudi:trips->compaction run --tableName <table_name> 
--parallelism <parallelism>
 ...
 ```
 
-## Synchronous Compaction
-By default, compaction is run asynchronously.
+### Flink Offline Compaction
+Offline compaction needs to submit the Flink task on the command line. The 
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
+`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
 
-If latency of ingesting records is important for you, you are most likely 
using Merge-On-Read tables.
-Merge-On-Read tables store data using a combination of columnar (e.g parquet) 
+ row based (e.g avro) file formats.
-Updates are logged to delta files & later compacted to produce new versions of 
columnar files. 
-To improve ingestion latency, Async Compaction is the default configuration.
+```bash
+# Command line
+./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor 
lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
+```
 
-If immediate read performance of a new commit is important for you, or you 
want simplicity of not managing separate compaction jobs,
-you may want Synchronous compaction, which means that as a commit is written 
it is also compacted by the same job.
+#### Options
 
-Compaction is run synchronously by passing the flag "--disable-compaction" 
(Meaning to disable async compaction scheduling).
-When both ingestion and compaction is running in the same spark context, you 
can use resource allocation configuration 
-in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight",
-"--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and 
"--compact-scheduling-minshare")
-to control executor allocation between ingestion and compaction.
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `--path` | `frue` | `--` | The path where the target table is stored on Hudi 
|
+| `--compaction-max-memory` | `false` | `100` | The index map size of log data 
during compaction, 100 MB by default. If you have enough memory, you can turn 
up this parameter |
+| `--schedule` | `false` | `false` | whether to execute the operation of 
scheduling compaction plan. When the write process is still writing, turning on 
this parameter have a risk of losing data. Therefore, it must be ensured that 
there are no write tasks currently writing data to this table when this 
parameter is turned on |
+| `--seq` | `false` | `LIFO` | The order in which compaction tasks are 
executed. Executing from the latest compaction plan by default. `LIFO`: 
executing from the latest plan. `FIFO`: executing from the oldest plan. |
\ No newline at end of file
diff --git a/website/docs/flink-quick-start-guide.md 
b/website/docs/flink-quick-start-guide.md
index 7ba0423..2feeba6 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -164,416 +164,19 @@ When consuming data in streaming query, Hudi Flink 
source can also accepts the c
 it can then applies the UPDATE and DELETE by per-row level. You can then sync 
a NEAR-REAL-TIME snapshot on Hudi for all kinds
 of RDBMS.
 
-## Flink Configuration
-
-Before using Flink, you need to set some global configurations in 
`$FLINK_HOME/conf/flink-conf.yaml`
-
-### Parallelism
-
-|  Option Name  | Default | Type | Description |
-|  -----------  | -------  | ------- | ------- |
-| `taskmanager.numberOfTaskSlots` | `1` | `Integer` | The number of parallel 
operator or user function instances that a single TaskManager can run. We 
recommend setting this value > 4, and the actual value needs to be set 
according to the amount of data |
-| `parallelism.default` | `1` | `Integer` | The default parallelism used when 
no parallelism is specified anywhere (default: 1). For example, If the value of 
[`write.bucket_assign.tasks`](#parallelism-1) is not set, this value will be 
used |
-
-### Memory
-
-|  Option Name  | Default | Type | Description |
-|  -----------  | -------  | ------- | ------- |
-| `jobmanager.memory.process.size` | `(none)` | `MemorySize` | Total Process 
Memory size for the JobManager. This includes all the memory that a JobManager 
JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM 
Overhead |
-| `taskmanager.memory.task.heap.size` | `(none)` | `MemorySize` | Task Heap 
Memory size for TaskExecutors. This is the size of JVM heap memory reserved for 
write cache |
-| `taskmanager.memory.managed.size`  |  `(none)`  | `MemorySize` | Managed 
Memory size for TaskExecutors. This is the size of off-heap memory managed by 
the memory manager, reserved for sorting and RocksDB state backend. If you 
choose RocksDB as the state backend, you need to set this memory |
-
-### Checkpoint
-
-|  Option Name  | Default | Type | Description |
-|  -----------  | -------  | ------- | ------- |
-| `execution.checkpointing.interval` | `(none)` | `Duration` | Setting this 
value as `execution.checkpointing.interval = 150000ms`, 150000ms = 2.5min. 
Configuring this parameter is equivalent to enabling the checkpoint |
-| `state.backend` | `(none)` | `String` | The state backend to be used to 
store state. We recommend setting store state as `rocksdb` : `state.backend: 
rocksdb`  |
-| `state.backend.rocksdb.localdir` | `(none)` | `String` | The local directory 
(on the TaskManager) where RocksDB puts its files |
-| `state.checkpoints.dir` | `(none)` | `String` | The default directory used 
for storing the data files and meta data of checkpoints in a Flink supported 
filesystem. The storage path must be accessible from all participating 
processes/nodes(i.e. all TaskManagers and JobManagers), like hdfs and oss path |
-| `state.backend.incremental`  |  `false`  | `Boolean` | Option whether the 
state backend should create incremental checkpoints, if possible. For an 
incremental checkpoint, only a diff from the previous checkpoint is stored, 
rather than the complete checkpoint state. If store state is setting as 
`rocksdb`, recommending to turn on |
-
-## Table Option
-
-Flink SQL job can be configured through the options in [`WITH`](#table-option) 
clause.
-The actual datasource level configs are listed below.
-
-### Memory
-
-:::note
-When optimizing memory, we need to pay attention to the [memory 
configuration](#memory) 
-and the number of taskManagers, parallelism of write tasks (write.tasks : 4) 
first. After confirm each write task to be
-allocated with enough memory, we can try to set these memory options.
-:::
-
-|  Option Name  | Description | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.task.max.size` | Maximum memory in MB for a write task, when the 
threshold hits, it flushes the max size data bucket to avoid OOM. Default 
`1024MB` | `1024D` | The memory reserved for write buffer is 
`write.task.max.size` - `compaction.max_memory`. When total buffer of write 
tasks reach the threshold, the largest buffer in the memory will be flushed |
-| `write.batch.size`  | In order to improve the efficiency of writing, Flink 
write task will cache data in buffer according to the write bucket until the 
memory reaches the threshold. When reached threshold, the data buffer would be 
flushed out. Default `64MB` | `64D` |  Recommend to use the default settings  |
-| `write.log_block.size` | The log writer of Hudi will not flush the data 
immediately after receiving data. The writer flush data to the disk in the unit 
of `LogBlock`. Before `LogBlock` reached threshold, records will be buffered in 
the writer in form of serialized bytes. Default `128MB`  | `128` |  Recommend 
to use the default settings  |
-| `write.merge.max_memory` | If write type is `COPY_ON_WRITE`, Hudi will merge 
the incremental data and base file data. The incremental data will be cached 
and spilled to disk. this threshold controls the max heap size that can be 
used. Default `100MB`  | `100` | Recommend to use the default settings |
-| `compaction.max_memory` | Same as `write.merge.max_memory`, but occurs 
during compaction. Default `100MB` | `100` | If it is online compaction, it can 
be turned up when resources are sufficient, such as setting as `1024MB` |
-
-### Parallelism
-
-|  Option Name  | Description | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.tasks` |  The parallelism of writer tasks. Each write task writes 1 
to `N` buckets in sequence. Default `4` | `4` | Increases the parallelism has 
no effect on the number of small files |
-| `write.bucket_assign.tasks`  |  The parallelism of bucket assigner 
operators. No default value, using Flink `parallelism.default`  | 
[`parallelism.default`](#parallelism) |  Increases the parallelism also 
increases the number of buckets, thus the number of small files (small buckets) 
 |
-| `write.index_boostrap.tasks` |  The parallelism of index bootstrap. 
Increasing parallelism can speed up the efficiency of the bootstrap stage. The 
bootstrap stage will block checkpointing. Therefore, it is necessary to set 
more checkpoint failure tolerance times. Default using Flink 
`parallelism.default` | [`parallelism.default`](#parallelism) | It only take 
effect when `index.bootsrap.enabled` is `true` |
-| `read.tasks` | The parallelism of read operators (batch and stream). Default 
`4`  | `4` |  |
-| `compaction.tasks` | The parallelism of online compaction. Default `4` | `4` 
| `Online compaction` will occupy the resources of the write task. It is 
recommended to use [`offline compaction`](#offline-compaction) |
-
-### Compaction
-
-:::note
-These are options only for `online compaction`. 
-:::
-
-:::note
-Turn off online compaction by setting `compaction.async.enabled` = `false`, 
but we still recommend turning on `compaction.schedule.enable` for the writing 
job. You can then execute the compaction plan by [`offline 
compaction`](#offline-compaction). 
-:::
-
-|  Option Name  | Description | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `compaction.schedule.enabled` | Whether to generate compaction plan 
periodically | `true` | Recommend to turn it on, even if 
`compaction.async.enabled` = `false` |
-| `compaction.async.enabled`  |  Async Compaction, enabled by default for MOR 
| `true` | Turn off `online compaction` by turning off this option |
-| `compaction.trigger.strategy`  | Strategy to trigger compaction | 
`num_commits` | Options are `num_commits`: trigger compaction when reach N 
delta commits; `time_elapsed`: trigger compaction when time elapsed > N seconds 
since last compaction; `num_and_time`: trigger compaction when both 
`NUM_COMMITS` and `TIME_ELAPSED` are satisfied; `num_or_time`: trigger 
compaction when `NUM_COMMITS` or `TIME_ELAPSED` is satisfied. |
-| `compaction.delta_commits` | Max delta commits needed to trigger compaction, 
default `5` commits | `5` | -- |
-| `compaction.delta_seconds`  |  Max delta seconds time needed to trigger 
compaction, default `1` hour | `3600` | -- |
-| `compaction.max_memory` | Max memory in MB for compaction spillable map, 
default `100MB` | `100` | If your have sufficient resources, recommend to 
adjust to `1024MB` |
-| `compaction.target_io`  |  Target IO per compaction (both read and write), 
default `500GB`| `512000` | -- |
-
-## Memory Optimization
-
-### MOR
-
-1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in 
memory` state backend is very memory intensive).
-2. If there is enough memory, `compaction.max_memory` can be set larger 
(`100MB` by default, and can be adjust to `1024MB`).
-3. Pay attention to the memory allocated to each write task by taskManager to 
ensure that each write task can be allocated to the 
-desired memory size `write.task.max.size`. For example, taskManager has `4GB` 
of memory running two streamWriteFunction, so each write task
-can be allocated with `2GB` memory. Please reserve some buffers because the 
network buffer and other types of tasks on taskManager (such as 
bucketAssignFunction) will also consume memory.
-4. Pay attention to the memory changes of compaction. `compaction.max_memory` 
controls the maximum memory that each task can be used when compaction tasks 
read
-logs. `compaction.tasks` controls the parallelism of compaction tasks. 
-
-### COW
-
-1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in 
memory` state backend is very memory intensive).
-2. Increase both `write.task.max.size` and `write.merge.max_memory` (`1024MB` 
and `100MB` by default, adjust to `2014MB` and `1024MB`).
-3. Pay attention to the memory allocated to each write task by taskManager to 
ensure that each write task can be allocated to the 
-desired memory size `write.task.max.size`. For example, taskManager has `4GB` 
of memory running two write tasks, so each write task 
-can be allocated with `2GB` memory. Please reserve some buffers because the 
network buffer and other types of tasks on taskManager (such as 
`BucketAssignFunction`) will also consume memory.
-
-## CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
-We recommend two ways for syncing CDC data into Hudi:
-
-![slide1 title](/assets/images/cdc-2-hudi.png)
-
-1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
-The advantage is that it does not rely on message queues, but the disadvantage 
is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
-but the disadvantage is that it relies on message queues.
-
-:::note
-- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
-- The MOR table can not handle DELETEs in event time sequence now, thus 
causing data loss. You better switch on the changelog mode through
-option `changelog.enabled`.
-:::
-
-## Bulk Insert
-
-For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
-import the snapshot data into Hudi.
-
-
-:::note
- `bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
-:::
-
-:::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records 
-by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by 
-frequent `file handle` switching.  
-:::
-
-:::note  
-The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it 
-will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](#parallelism)).
-:::
-
-### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
-| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= [`write.bucket_assign.tasks`](#parallelism) |
-| `write.bulk_insert.shuffle_by_partition` | `false` | `true` | Whether to 
shuffle data according to the partition field before writing. Enabling this 
option will reduce the number of small files, but there may be a risk of data 
skew  |
-| `write.bulk_insert.sort_by_partition` | `false`  | `true` | Whether to sort 
data according to the partition field before writing. Enabling this option will 
reduce the number of small files when a write task writes multiple partitions  |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
-
-## Index Bootstrap
-
-For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert). 
-User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
-
-:::note
-If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`, 
-and then reduce the resources to write `incremental data` (or open the rate 
limit function).
-:::
-
-### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
-| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
-
-### How To Use
-
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
-
-:::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
-:::
-
-## Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
-
-### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
-
-:::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
-:::
-
-:::note
-After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
-:::
-
-
-## Append Mode
-
-If INSERT operation is used for ingestion, for COW table, there is no merging 
of small files by default; for MOR table, the small file strategy is applied 
always: MOR appends delta records to log files.
-
-The small file strategy lead to performance degradation. If you want to apply 
the behavior of file merge for COW table, turns on option 
`write.insert.cluster`, there is no record key combining by the way.
-
-### Options
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
-
-## Rate Limit
-There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. At this time, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
-
-### Options
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
-
-## Incremental Query
-There are 3 use cases for incremental query:
-1. Streaming query: specify the start commit with option `read.start-commit`;
-2. Batch query: specify the start commit with option `read.start-commit` and 
end commit with option `read.end-commit`,
-the interval is a closed one: both start commit and end commit are inclusive;
-3. TimeTravel: consume as batch for an instant time, specify the 
`read.end-commit` is enough because the start commit is latest by default.
-
-### Options
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.start-commit` | `false` | the latest commit | Specify `earliest` to 
consume from the start commit |
-| `write.end-commit` | `false` | the latest commit | -- |
-
-## Hive Query
-
-### Install
-
-Now you can git clone Hudi master branch to test Flink hive sync. The first 
step is to install Hudi to get `hudi-flink-bundle_2.11-0.x.jar`.
-`hudi-flink-bundle` module pom.xml sets the scope related to hive as 
`provided` by default. If you want to use hive sync, you need to use the
-profile `flink-bundle-shade-hive` during packaging. Executing command below to 
install:
-
-```bash
-# Maven install command
-mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
-
-# For hive1, you need to use profile -Pflink-bundle-shade-hive1
-# For hive3, you need to use profile -Pflink-bundle-shade-hive3 
-```
-
-:::note 
-Hive1.x can only synchronize metadata to hive, but cannot use hive query now. 
If you need to query, you can use spark to query hive table.
-:::
-
-:::note 
-If using hive profile, you need to modify the hive version in the profile to 
your hive cluster version (Only need to modify the hive version in this 
profile).
-The location of this `pom.xml` is `packaging/hudi-flink-bundle/pom.xml`, and 
the corresponding profile is at the bottom of this file.
-:::
-
-### Hive Environment
-
-1. Import `hudi-hadoop-mr-bundle` into hive. Creating `auxlib/` folder under 
the root directory of hive, and moving 
`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` into `auxlib`.
-`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` is at 
`packaging/hudi-hadoop-mr-bundle/target`.
-
-2. When Flink sql client connects hive metastore remotely, `hive metastore` 
and `hiveserver2` services need to be enabled, and the port number need to
-be set correctly. Command to turn on the services:
-
-```bash
-# Enable hive metastore and hiveserver2
-nohup ./bin/hive --service metastore &
-nohup ./bin/hive --service hiveserver2 &
-
-# While modifying the jar package under auxlib, you need to restart the 
service.
-```
-
-### Sync Template
-
-Flink hive sync now supports two hive sync mode, `hms` and `jdbc`. `hms` mode 
only needs to configure metastore uris. For 
-the `jdbc` mode, the JDBC attributes and metastore uris both need to be 
configured. The options template is as below:
-
-```sql
--- hms mode template
-CREATE TABLE t1(
-  uuid VARCHAR(20),
-  name VARCHAR(10),
-  age INT,
-  ts TIMESTAMP(3),
-  `partition` VARCHAR(20)
-)
-PARTITIONED BY (`partition`)
-WITH (
-  'connector' = 'hudi',
-  'path' = '${db_path}/t1',
-  'table.type' = 'COPY_ON_WRITE',  --If MERGE_ON_READ, hive query will not 
have output until the parquet file is generated
-  'hive_sync.enable' = 'true',     -- Required. To enable hive synchronization
-  'hive_sync.mode' = 'hms'         -- Required. Setting hive sync mode to hms, 
default jdbc
-  'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port 
need set on hive-site.xml
-);
-
-
--- jdbc mode template
-CREATE TABLE t1(
-  uuid VARCHAR(20),
-  name VARCHAR(10),
-  age INT,
-  ts TIMESTAMP(3),
-  `partition` VARCHAR(20)
-)
-PARTITIONED BY (`partition`)
-WITH (
-  'connector' = 'hudi',
-  'path' = '${db_path}/t1',
-  'table.type' = 'COPY_ON_WRITE',  --If MERGE_ON_READ, hive query will not 
have output until the parquet file is generated
-  'hive_sync.enable' = 'true',     -- Required. To enable hive synchronization
-  'hive_sync.mode' = 'hms'         -- Required. Setting hive sync mode to hms, 
default jdbc
-  'hive_sync.metastore.uris' = 'thrift://${ip}:9083'  -- Required. The port 
need set on hive-site.xml
-  'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000',    -- required, hiveServer 
port
-  'hive_sync.table'='t1',                          -- required, hive table name
-  'hive_sync.db'='testDB',                         -- required, hive database 
name
-  'hive_sync.username'='${user_name}',                     -- required, HMS 
username
-  'hive_sync.password'='${password}'             -- required, HMS password
-);
-```
-
-### Query
-
-While using hive beeline query, you need to enter settings:
-```bash
-set hive.input.format = 
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
-```
-
-## Presto Query
-
-### Hive Sync
-First, you need to sync Hudi table metadata to hive according to the above 
steps of [Hive Query](#hive-query).
-
-### Presto Environment
-1. Configure Presto according to the [Presto configuration 
document](https://prestodb.io/docs/current/installation/deployment.html).
-2. Configure hive catalog in ` 
/presto-server-0.2xxx/etc/catalog/hive.properties` as follows:
-
-```properties
-connector.name=hive-hadoop2
-hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083
-hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xml
-```
-
-### Query
-
-Beginning query by connecting hive metastore with presto client. The presto 
client connection command is as follows:
-
-```bash
-# The presto client connection command
-./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema default
-```
-
-:::note
-1. Presto-server-0.2445 is a lower version. When querying the `rt table` of 
MERGE_ON_WRITE, there will be a package conflict, this bug is in fix.
-2. When Presto-server-xxx version < 0.233, the `hudi-presto-bundle.jar` needs 
to manually import into `{presto_install_dir}/plugin/hive-hadoop2/`.
-:::
-
-
-## Offline Compaction
-
-The compaction of the MERGE_ON_READ table is enabled by default. The trigger 
strategy is to perform compaction after completing
-five commits. Because compaction consumes a lot of memory and is placed in the 
same pipeline with the write operation, it's easy to
-interfere with the write operation when there is a large amount of data (> 
100000 per second). As this time, it is more stable to execute
-the compaction task by using offline compaction.
-
-:::note 
-The execution of a compaction task includes two parts: schedule compaction 
plan and execute compaction plan. It's recommended that
-the process of schedule compaction plan be triggered periodically by the write 
task, and the write parameter `compaction.schedule.enable`
-is enabled by default.
-:::
-
-Offline compaction needs to submit the Flink task on the command line. The 
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
-`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
-
-```bash
-# Command line
-./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor 
lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
-```
-
-### Options
-
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `--path` | `frue` | `--` | The path where the target table is stored on Hudi 
|
-| `--compaction-max-memory` | `false` | `100` | The index map size of log data 
during compaction, 100 MB by default. If you have enough memory, you can turn 
up this parameter |
-| `--schedule` | `false` | `false` | whether to execute the operation of 
scheduling compaction plan. When the write process is still writing, turning on 
this parameter have a risk of losing data. Therefore, it must be ensured that 
there are no write tasks currently writing data to this table when this 
parameter is turned on |
-| `--seq` | `false` | `LIFO` | The order in which compaction tasks are 
executed. Executing from the latest compaction plan by default. `LIFO`: 
executing from the latest plan. `FIFO`: executing from the oldest plan. |
-
-## Write Rate Limit
-
-In the existing data synchronization, `snapshot data` and `incremental data` 
are send to kafka first, and then streaming write
-to Hudi by Flink. Because the direct consumption of `snapshot data` will lead 
to problems such as high throughput and serious 
-disorder (writing partition randomly), which will lead to write performance 
degradation and throughput glitches. At this time, 
-the `write.rate.limit` option can be turned on to ensure smooth writing.
+## Where To Go From Here?
+Check out the [Flink Setup](/docs/next/flink_configuration) how-to page for 
deeper dive into configuration settings. 
 
-### Options
+If you are relatively new to Apache Hudi, it is important to be familiar with 
a few core concepts:
+  - [Hudi Timeline](/docs/next/timeline) – How Hudi manages transactions and 
other table services
+  - [Hudi File Layout](/docs/next/file_layouts) - How the files are laid out 
on storage
+  - [Hudi Table Types](/docs/next/table_types) – `COPY_ON_WRITE` and 
`MERGE_ON_READ`
+  - [Hudi Query Types](/docs/next/table_types#query-types) – Snapshot Queries, 
Incremental Queries, Read-Optimized Queries
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Turn off by default |
+See more in the "Concepts" section of the docs.
 
-## Where To Go From Here?
+Take a look at recent [blog posts](/blog) that go in depth on certain topics 
or use cases.
 
-We used Flink here to show case the capabilities of Hudi. However, Hudi can 
support multiple table types/query types and 
 Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto 
and much more. We have put together a 
 [demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that show cases all 
of this on a docker based setup with all 
 dependent systems running locally. We recommend you replicate the same setup 
and run the demo yourself, by following 
diff --git a/website/docs/flink_configuration.md 
b/website/docs/flink_configuration.md
new file mode 100644
index 0000000..ba7853d
--- /dev/null
+++ b/website/docs/flink_configuration.md
@@ -0,0 +1,117 @@
+---
+title: Flink Setup
+toc: true
+---
+
+## Global Configurations
+When using Flink, you can set some global configurations in 
`$FLINK_HOME/conf/flink-conf.yaml`
+
+### Parallelism
+
+|  Option Name  | Default | Type | Description |
+|  -----------  | -------  | ------- | ------- |
+| `taskmanager.numberOfTaskSlots` | `1` | `Integer` | The number of parallel 
operator or user function instances that a single TaskManager can run. We 
recommend setting this value > 4, and the actual value needs to be set 
according to the amount of data |
+| `parallelism.default` | `1` | `Integer` | The default parallelism used when 
no parallelism is specified anywhere (default: 1). For example, If the value of 
[`write.bucket_assign.tasks`](#parallelism-1) is not set, this value will be 
used |
+
+### Memory
+
+|  Option Name  | Default | Type | Description |
+|  -----------  | -------  | ------- | ------- |
+| `jobmanager.memory.process.size` | `(none)` | `MemorySize` | Total Process 
Memory size for the JobManager. This includes all the memory that a JobManager 
JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM 
Overhead |
+| `taskmanager.memory.task.heap.size` | `(none)` | `MemorySize` | Task Heap 
Memory size for TaskExecutors. This is the size of JVM heap memory reserved for 
write cache |
+| `taskmanager.memory.managed.size`  |  `(none)`  | `MemorySize` | Managed 
Memory size for TaskExecutors. This is the size of off-heap memory managed by 
the memory manager, reserved for sorting and RocksDB state backend. If you 
choose RocksDB as the state backend, you need to set this memory |
+
+### Checkpoint
+
+|  Option Name  | Default | Type | Description |
+|  -----------  | -------  | ------- | ------- |
+| `execution.checkpointing.interval` | `(none)` | `Duration` | Setting this 
value as `execution.checkpointing.interval = 150000ms`, 150000ms = 2.5min. 
Configuring this parameter is equivalent to enabling the checkpoint |
+| `state.backend` | `(none)` | `String` | The state backend to be used to 
store state. We recommend setting store state as `rocksdb` : `state.backend: 
rocksdb`  |
+| `state.backend.rocksdb.localdir` | `(none)` | `String` | The local directory 
(on the TaskManager) where RocksDB puts its files |
+| `state.checkpoints.dir` | `(none)` | `String` | The default directory used 
for storing the data files and meta data of checkpoints in a Flink supported 
filesystem. The storage path must be accessible from all participating 
processes/nodes(i.e. all TaskManagers and JobManagers), like hdfs and oss path |
+| `state.backend.incremental`  |  `false`  | `Boolean` | Option whether the 
state backend should create incremental checkpoints, if possible. For an 
incremental checkpoint, only a diff from the previous checkpoint is stored, 
rather than the complete checkpoint state. If store state is setting as 
`rocksdb`, recommending to turn on |
+
+## Table Options
+
+Flink SQL jobs can be configured through options in the `WITH` clause.
+The actual datasource level configs are listed below.
+
+### Memory
+
+:::note
+When optimizing memory, we need to pay attention to the memory configuration
+and the number of taskManagers, parallelism of write tasks (write.tasks : 4) 
first. After confirm each write task to be
+allocated with enough memory, we can try to set these memory options.
+:::
+
+|  Option Name  | Description | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.task.max.size` | Maximum memory in MB for a write task, when the 
threshold hits, it flushes the max size data bucket to avoid OOM. Default 
`1024MB` | `1024D` | The memory reserved for write buffer is 
`write.task.max.size` - `compaction.max_memory`. When total buffer of write 
tasks reach the threshold, the largest buffer in the memory will be flushed |
+| `write.batch.size`  | In order to improve the efficiency of writing, Flink 
write task will cache data in buffer according to the write bucket until the 
memory reaches the threshold. When reached threshold, the data buffer would be 
flushed out. Default `64MB` | `64D` |  Recommend to use the default settings  |
+| `write.log_block.size` | The log writer of Hudi will not flush the data 
immediately after receiving data. The writer flush data to the disk in the unit 
of `LogBlock`. Before `LogBlock` reached threshold, records will be buffered in 
the writer in form of serialized bytes. Default `128MB`  | `128` |  Recommend 
to use the default settings  |
+| `write.merge.max_memory` | If write type is `COPY_ON_WRITE`, Hudi will merge 
the incremental data and base file data. The incremental data will be cached 
and spilled to disk. this threshold controls the max heap size that can be 
used. Default `100MB`  | `100` | Recommend to use the default settings |
+| `compaction.max_memory` | Same as `write.merge.max_memory`, but occurs 
during compaction. Default `100MB` | `100` | If it is online compaction, it can 
be turned up when resources are sufficient, such as setting as `1024MB` |
+
+### Parallelism
+
+|  Option Name  | Description | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.tasks` |  The parallelism of writer tasks. Each write task writes 1 
to `N` buckets in sequence. Default `4` | `4` | Increases the parallelism has 
no effect on the number of small files |
+| `write.bucket_assign.tasks`  |  The parallelism of bucket assigner 
operators. No default value, using Flink `parallelism.default`  | 
[`parallelism.default`](#parallelism) |  Increases the parallelism also 
increases the number of buckets, thus the number of small files (small buckets) 
 |
+| `write.index_boostrap.tasks` |  The parallelism of index bootstrap. 
Increasing parallelism can speed up the efficiency of the bootstrap stage. The 
bootstrap stage will block checkpointing. Therefore, it is necessary to set 
more checkpoint failure tolerance times. Default using Flink 
`parallelism.default` | [`parallelism.default`](#parallelism) | It only take 
effect when `index.bootsrap.enabled` is `true` |
+| `read.tasks` | The parallelism of read operators (batch and stream). Default 
`4`  | `4` |  |
+| `compaction.tasks` | The parallelism of online compaction. Default `4` | `4` 
| `Online compaction` will occupy the resources of the write task. It is 
recommended to use [`offline compaction`](#offline-compaction) |
+
+### Compaction
+
+:::note
+These are options only for `online compaction`.
+:::
+
+:::note
+Turn off online compaction by setting `compaction.async.enabled` = `false`, 
but we still recommend turning on `compaction.schedule.enable` for the writing 
job. You can then execute the compaction plan by [`offline 
compaction`](#offline-compaction).
+:::
+
+|  Option Name  | Description | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `compaction.schedule.enabled` | Whether to generate compaction plan 
periodically | `true` | Recommend to turn it on, even if 
`compaction.async.enabled` = `false` |
+| `compaction.async.enabled`  |  Async Compaction, enabled by default for MOR 
| `true` | Turn off `online compaction` by turning off this option |
+| `compaction.trigger.strategy`  | Strategy to trigger compaction | 
`num_commits` | Options are `num_commits`: trigger compaction when reach N 
delta commits; `time_elapsed`: trigger compaction when time elapsed > N seconds 
since last compaction; `num_and_time`: trigger compaction when both 
`NUM_COMMITS` and `TIME_ELAPSED` are satisfied; `num_or_time`: trigger 
compaction when `NUM_COMMITS` or `TIME_ELAPSED` is satisfied. |
+| `compaction.delta_commits` | Max delta commits needed to trigger compaction, 
default `5` commits | `5` | -- |
+| `compaction.delta_seconds`  |  Max delta seconds time needed to trigger 
compaction, default `1` hour | `3600` | -- |
+| `compaction.max_memory` | Max memory in MB for compaction spillable map, 
default `100MB` | `100` | If your have sufficient resources, recommend to 
adjust to `1024MB` |
+| `compaction.target_io`  |  Target IO per compaction (both read and write), 
default `500GB`| `512000` | -- |
+
+## Memory Optimization
+
+### MOR
+
+1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in 
memory` state backend is very memory intensive).
+2. If there is enough memory, `compaction.max_memory` can be set larger 
(`100MB` by default, and can be adjust to `1024MB`).
+3. Pay attention to the memory allocated to each write task by taskManager to 
ensure that each write task can be allocated to the
+   desired memory size `write.task.max.size`. For example, taskManager has 
`4GB` of memory running two streamWriteFunction, so each write task
+   can be allocated with `2GB` memory. Please reserve some buffers because the 
network buffer and other types of tasks on taskManager (such as 
bucketAssignFunction) will also consume memory.
+4. Pay attention to the memory changes of compaction. `compaction.max_memory` 
controls the maximum memory that each task can be used when compaction tasks 
read
+   logs. `compaction.tasks` controls the parallelism of compaction tasks.
+
+### COW
+
+1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in 
memory` state backend is very memory intensive).
+2. Increase both `write.task.max.size` and `write.merge.max_memory` (`1024MB` 
and `100MB` by default, adjust to `2014MB` and `1024MB`).
+3. Pay attention to the memory allocated to each write task by taskManager to 
ensure that each write task can be allocated to the
+   desired memory size `write.task.max.size`. For example, taskManager has 
`4GB` of memory running two write tasks, so each write task
+   can be allocated with `2GB` memory. Please reserve some buffers because the 
network buffer and other types of tasks on taskManager (such as 
`BucketAssignFunction`) will also consume memory.
+
+
+## Write Rate Limit
+
+In the existing data synchronization, `snapshot data` and `incremental data` 
are send to kafka first, and then streaming write
+to Hudi by Flink. Because the direct consumption of `snapshot data` will lead 
to problems such as high throughput and serious
+disorder (writing partition randomly), which will lead to write performance 
degradation and throughput glitches. At this time,
+the `write.rate.limit` option can be turned on to ensure smooth writing.
+
+### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Turn off by default |
\ No newline at end of file
diff --git a/website/docs/hoodie_deltastreamer.md 
b/website/docs/hoodie_deltastreamer.md
index 615ff94..3129593 100644
--- a/website/docs/hoodie_deltastreamer.md
+++ b/website/docs/hoodie_deltastreamer.md
@@ -151,7 +151,7 @@ and then ingest it as follows.
 
 In some cases, you may want to migrate your existing table into Hudi 
beforehand. Please refer to [migration guide](/docs/migration_guide).
 
-## MultiTableDeltaStreamer
+### MultiTableDeltaStreamer
 
 `HoodieMultiTableDeltaStreamer`, a wrapper on top of `HoodieDeltaStreamer`, 
enables one to ingest multiple tables at a single go into hudi datasets. 
Currently it only supports sequential processing of tables to be ingested and 
COPY_ON_WRITE storage type. The command line options for 
`HoodieMultiTableDeltaStreamer` are pretty much similar to 
`HoodieDeltaStreamer` with the only exception that you are required to provide 
table wise configs in separate files in a dedicated config folder. The [...]
 
@@ -189,7 +189,7 @@ Sample config files for table wise overridden properties 
can be found under `hud
 
 For detailed information on how to configure and use 
`HoodieMultiTableDeltaStreamer`, please refer [blog 
section](/blog/2020/08/22/ingest-multiple-tables-using-hudi).
 
-## Concurrency Control
+### Concurrency Control
 
 The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides 
ways to ingest from different sources such as DFS or Kafka, with the following 
capabilities.
 
@@ -336,7 +336,146 @@ jobs: `hoodie.write.meta.key.prefixes = 
'deltastreamer.checkpoint.key'`
 Spark SQL should be configured using this hoodie config:
 hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table'
 
-## Hudi Kafka Connect Sink
+## Flink Ingestion
+
+### CDC Ingestion
+CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
+We recommend two ways for syncing CDC data into Hudi:
+
+![slide1 title](/assets/images/cdc-2-hudi.png)
+
+1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
+   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the db server;
+2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
+   but the disadvantage is that it relies on message queues.
+
+:::note
+- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
+- The MOR table can not handle DELETEs in event time sequence now, thus 
causing data loss. You better switch on the changelog mode through
+  option `changelog.enabled`.
+  :::
+
+### Bulk Insert
+
+For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
+import the snapshot data into Hudi.
+
+
+:::note
+`bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
+:::
+
+:::note
+`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records
+by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by
+frequent `file handle` switching.  
+:::
+
+:::note  
+The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
+In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it
+will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](#parallelism)).
+:::
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
+| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= [`write.bucket_assign.tasks`](#parallelism) |
+| `write.bulk_insert.shuffle_by_partition` | `false` | `true` | Whether to 
shuffle data according to the partition field before writing. Enabling this 
option will reduce the number of small files, but there may be a risk of data 
skew  |
+| `write.bulk_insert.sort_by_partition` | `false`  | `true` | Whether to sort 
data according to the partition field before writing. Enabling this option will 
reduce the number of small files when a write task writes multiple partitions  |
+| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
+
+### Index Bootstrap
+
+For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert).
+User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
+
+:::note
+If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`,
+and then reduce the resources to write `incremental data` (or open the rate 
limit function).
+:::
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
+| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
+
+#### How To Use
+
+1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
+2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
+3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
+4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
+5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` as `false`.
+
+:::note
+1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
+2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
+3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
+4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+   :::
+
+### Changelog Mode
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
+data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
+All changelog records can be consumed with Flink streaming reader.
+
+#### Options
+
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
+
+:::note
+Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
+:::
+
+:::note
+After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
+stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
+the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
+:::
+
+
+### Append Mode
+
+If INSERT operation is used for ingestion, for COW table, there is no merging 
of small files by default; for MOR table, the small file strategy is applied 
always: MOR appends delta records to log files.
+
+The small file strategy lead to performance degradation. If you want to apply 
the behavior of file merge for COW table, turns on option 
`write.insert.cluster`, there is no record key combining by the way.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
+
+### Rate Limit
+There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. At this time, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+
+### Incremental Query
+There are 3 use cases for incremental query:
+1. Streaming query: specify the start commit with option `read.start-commit`;
+2. Batch query: specify the start commit with option `read.start-commit` and 
end commit with option `read.end-commit`,
+   the interval is a closed one: both start commit and end commit are 
inclusive;
+3. TimeTravel: consume as batch for an instant time, specify the 
`read.end-commit` is enough because the start commit is latest by default.
+
+#### Options
+|  Option Name  | Required | Default | Remarks |
+|  -----------  | -------  | ------- | ------- |
+| `write.start-commit` | `false` | the latest commit | Specify `earliest` to 
consume from the start commit |
+| `write.end-commit` | `false` | the latest commit | -- |
+
+## Kafka Connect Sink
 If you want to perform streaming ingestion into Hudi format similar to 
HoodieDeltaStreamer, but you don't want to depend on Spark,
 try out the new experimental release of Hudi Kafka Connect Sink. Read the 
[ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect) 
 for full documentation.
diff --git a/website/docs/query_engine_setup.md 
b/website/docs/query_engine_setup.md
index 99887b5..d64a577 100644
--- a/website/docs/query_engine_setup.md
+++ b/website/docs/query_engine_setup.md
@@ -18,13 +18,30 @@ Both snapshot and read optimized queries are supported on 
MERGE_ON_READ Hudi tab
 instructions for PrestoDB would vary based on versions. Please check the below 
table for query types supported and installation instructions
 for different versions of PrestoDB.
 
-
 | **PrestoDB Version** | **Installation description** | **Query types 
supported** |
 
|----------------------|------------------------------|---------------------------|
 | < 0.233              | Requires the `hudi-presto-bundle` jar to be placed 
into `<presto_install>/plugin/hive-hadoop2/`, across the installation. | 
Snapshot querying on COW tables. Read optimized querying on MOR tables. |
 | >= 0.233             | No action needed. Hudi (0.5.1-incubating) is a 
compile time dependency. | Snapshot querying on COW tables. Read optimized 
querying on MOR tables. |
 | >= 0.240             | No action needed. Hudi 0.5.3 version is a compile 
time dependency. | Snapshot querying on both COW and MOR tables |
 
+### Presto Environment
+1. Configure Presto according to the [Presto configuration 
document](https://prestodb.io/docs/current/installation/deployment.html).
+2. Configure hive catalog in ` 
/presto-server-0.2xxx/etc/catalog/hive.properties` as follows:
+
+```properties
+connector.name=hive-hadoop2
+hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083
+hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xml
+```
+
+### Query
+Beginning query by connecting hive metastore with presto client. The presto 
client connection command is as follows:
+
+```bash
+# The presto client connection command
+./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema default
+```
+
 ## Trino
 :::note
 [Trino](https://trino.io/) (formerly PrestoSQL) was forked off of PrestoDB a 
few years ago. Hudi supports 'Snapshot' queries for Copy-On-Write tables and 
'Read Optimized' queries
diff --git a/website/docs/syncing_metastore.md 
b/website/docs/syncing_metastore.md
index 90e0c40..dad767b 100644
--- a/website/docs/syncing_metastore.md
+++ b/website/docs/syncing_metastore.md
@@ -3,7 +3,7 @@ title: Syncing to Metastore
 keywords: [hudi, hive, sync]
 ---
 
-## Syncing to Hive
+## Spark and DeltaStreamer
 
 Writing data with [DataSource](/docs/writing_data) writer or 
[HoodieDeltaStreamer](/docs/hoodie_deltastreamer) supports syncing of the 
table's latest schema to Hive metastore, such that queries can pick up new 
columns and partitions.
 In case, it's preferable to run this from commandline or in an independent 
jvm, Hudi provides a `HiveSyncTool`, which can be invoked as below,
@@ -21,3 +21,101 @@ cd hudi-hive
 ./run_sync_tool.sh
  [hudi-hive]$ ./run_sync_tool.sh --help
 ```
+
+
+## Flink Setup
+
+### Install
+
+Now you can git clone Hudi master branch to test Flink hive sync. The first 
step is to install Hudi to get `hudi-flink-bundle_2.11-0.x.jar`.
+`hudi-flink-bundle` module pom.xml sets the scope related to hive as 
`provided` by default. If you want to use hive sync, you need to use the
+profile `flink-bundle-shade-hive` during packaging. Executing command below to 
install:
+
+```bash
+# Maven install command
+mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
+
+# For hive1, you need to use profile -Pflink-bundle-shade-hive1
+# For hive3, you need to use profile -Pflink-bundle-shade-hive3 
+```
+
+:::note
+Hive1.x can only synchronize metadata to hive, but cannot use hive query now. 
If you need to query, you can use spark to query hive table.
+:::
+
+:::note
+If using hive profile, you need to modify the hive version in the profile to 
your hive cluster version (Only need to modify the hive version in this 
profile).
+The location of this `pom.xml` is `packaging/hudi-flink-bundle/pom.xml`, and 
the corresponding profile is at the bottom of this file.
+:::
+
+### Hive Environment
+
+1. Import `hudi-hadoop-mr-bundle` into hive. Creating `auxlib/` folder under 
the root directory of hive, and moving 
`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` into `auxlib`.
+   `hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` is at 
`packaging/hudi-hadoop-mr-bundle/target`.
+
+2. When Flink sql client connects hive metastore remotely, `hive metastore` 
and `hiveserver2` services need to be enabled, and the port number need to
+   be set correctly. Command to turn on the services:
+
+```bash
+# Enable hive metastore and hiveserver2
+nohup ./bin/hive --service metastore &
+nohup ./bin/hive --service hiveserver2 &
+
+# While modifying the jar package under auxlib, you need to restart the 
service.
+```
+
+### Sync Template
+
+Flink hive sync now supports two hive sync mode, `hms` and `jdbc`. `hms` mode 
only needs to configure metastore uris. For
+the `jdbc` mode, the JDBC attributes and metastore uris both need to be 
configured. The options template is as below:
+
+```sql
+-- hms mode template
+CREATE TABLE t1(
+  uuid VARCHAR(20),
+  name VARCHAR(10),
+  age INT,
+  ts TIMESTAMP(3),
+  `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+  'connector' = 'hudi',
+  'path' = '${db_path}/t1',
+  'table.type' = 'COPY_ON_WRITE',  --If MERGE_ON_READ, hive query will not 
have output until the parquet file is generated
+  'hive_sync.enable' = 'true',     -- Required. To enable hive synchronization
+  'hive_sync.mode' = 'hms'         -- Required. Setting hive sync mode to hms, 
default jdbc
+  'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port 
need set on hive-site.xml
+);
+
+
+-- jdbc mode template
+CREATE TABLE t1(
+  uuid VARCHAR(20),
+  name VARCHAR(10),
+  age INT,
+  ts TIMESTAMP(3),
+  `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+  'connector' = 'hudi',
+  'path' = '${db_path}/t1',
+  'table.type' = 'COPY_ON_WRITE',  --If MERGE_ON_READ, hive query will not 
have output until the parquet file is generated
+  'hive_sync.enable' = 'true',     -- Required. To enable hive synchronization
+  'hive_sync.mode' = 'hms'         -- Required. Setting hive sync mode to hms, 
default jdbc
+  'hive_sync.metastore.uris' = 'thrift://${ip}:9083'  -- Required. The port 
need set on hive-site.xml
+  'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000',    -- required, hiveServer 
port
+  'hive_sync.table'='t1',                          -- required, hive table name
+  'hive_sync.db'='testDB',                         -- required, hive database 
name
+  'hive_sync.username'='${user_name}',                     -- required, HMS 
username
+  'hive_sync.password'='${password}'             -- required, HMS password
+);
+```
+
+### Query
+
+While using hive beeline query, you need to enter settings:
+```bash
+set hive.input.format = 
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
+```
\ No newline at end of file
diff --git a/website/sidebars.js b/website/sidebars.js
index adc3cfb..a1dc301 100644
--- a/website/sidebars.js
+++ b/website/sidebars.js
@@ -41,6 +41,7 @@ module.exports = {
                 'writing_data',
                 'hoodie_deltastreamer',
                 'querying_data',
+                'flink_configuration',
                 'syncing_metastore',
             ],
         },

Reply via email to