This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 805a21b [HUDI-2710] Update the document for flink hudi quick start
(#3943)
805a21b is described below
commit 805a21b7b28dd0ebe0ddd350226cf100d8505aa7
Author: Danny Chan <[email protected]>
AuthorDate: Tue Nov 9 12:00:45 2021 +0800
[HUDI-2710] Update the document for flink hudi quick start (#3943)
---
website/docs/flink-quick-start-guide.md | 53 +++++++++++++++++++++++-----
website/static/assets/images/cdc-2-hudi.png | Bin 0 -> 689096 bytes
2 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index af6c2d6..3f4d4a9 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -223,7 +223,7 @@ allocated with enough memory, we can try to set these
memory options.
| `write.bucket_assign.tasks` | The parallelism of bucket assigner
operators. No default value, using Flink `parallelism.default` |
[`parallelism.default`](#parallelism) | Increases the parallelism also
increases the number of buckets, thus the number of small files (small buckets)
|
| `write.index_boostrap.tasks` | The parallelism of index bootstrap.
Increasing parallelism can speed up the efficiency of the bootstrap stage. The
bootstrap stage will block checkpointing. Therefore, it is necessary to set
more checkpoint failure tolerance times. Default using Flink
`parallelism.default` | [`parallelism.default`](#parallelism) | It only take
effect when `index.bootsrap.enabled` is `true` |
| `read.tasks` | The parallelism of read operators (batch and stream). Default
`4` | `4` | |
-| `compaction.tasks` | The parallelism of online compaction. Default `10` |
`10` | `Online compaction` will occupy the resources of the write task. It is
recommended to use [`offline compaction`](#offline-compaction) |
+| `compaction.tasks` | The parallelism of online compaction. Default `4` | `4`
| `Online compaction` will occupy the resources of the write task. It is
recommended to use [`offline compaction`](#offline-compaction) |
### Compaction
@@ -243,7 +243,7 @@ Turn off online compaction by setting
`compaction.async.enabled` = `false`, but
| `compaction.delta_commits` | Max delta commits needed to trigger compaction,
default `5` commits | `5` | -- |
| `compaction.delta_seconds` | Max delta seconds time needed to trigger
compaction, default `1` hour | `3600` | -- |
| `compaction.max_memory` | Max memory in MB for compaction spillable map,
default `100MB` | `100` | If your have sufficient resources, recommend to
adjust to `1024MB` |
-| `compaction.target_io` | Target IO per compaction (both read and write),
default `5GB`| `5120` | The default value for `offline compaction` is `500GB` |
+| `compaction.target_io` | Target IO per compaction (both read and write),
default `500GB`| `512000` | -- |
## Memory Optimization
@@ -265,6 +265,23 @@ logs. `compaction.tasks` controls the parallelism of
compaction tasks.
desired memory size `write.task.max.size`. For example, taskManager has `4GB`
of memory running two write tasks, so each write task
can be allocated with `2GB` memory. Please reserve some buffers because the
network buffer and other types of tasks on taskManager (such as
`BucketAssignFunction`) will also consume memory.
+## CDC Ingestion
+CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
+We recommend two ways for syncing CDC data into Hudi:
+
+
+
+1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
+The advantage is that it does not rely on message queues, but the disadvantage
is that it puts pressure on the db server;
+2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
+but the disadvantage is that it relies on message queues.
+
+:::note
+- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
+- The MOR table can not handle DELETEs in event time sequence now, thus
causing data loss. You better switch on the changelog mode through
+option `changelog.enabled`.
+:::
+
## Bulk Insert
For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
@@ -352,18 +369,38 @@ the compaction options:
[`compaction.delta_commits`](#compaction) and [`compacti
:::
-## Insert Mode
+## Append Mode
+
+If INSERT operation is used for ingestion, for COW table, there is no merging
of small files by default; for MOR table, the small file strategy is applied
always: MOR appends delta records to log files.
+
+The small file strategy lead to performance degradation. If you want to apply
the behavior of file merge for COW table, turns on option
`write.insert.cluster`, there is no record key combining by the way.
-Hudi apply the small file strategy for the insert mode by default: MOR appends
delta records to log files, COW merges the base parquet files (the incremental
data set will be deduplicated).
-This strategy lead to performance degradation.
+### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
+
+## Rate Limit
+There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. At this time, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
+
+### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
-If you want to forbid the behavior of file merge, sets
`write.insert.deduplicate` as `false`,the deduplication is skipped.
-Each flush behavior directly writes a now parquet file (MOR table also
directly write parquet file).
+## Incremental Query
+There are 3 use cases for incremental query:
+1. Streaming query: specify the start commit with option `read.start-commit`;
+2. Batch query: specify the start commit with option `read.start-commit` and
end commit with option `read.end-commit`,
+the interval is a closed one: both start commit and end commit are inclusive;
+3. TimeTravel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
### Options
| Option Name | Required | Default | Remarks |
| ----------- | ------- | ------- | ------- |
-| `write.insert.deduplicate` | `false` | `true` | Insert mode enable
deduplication by default. After this option is turned off, each flush behavior
directly writes a now parquet file |
+| `write.start-commit` | `false` | the latest commit | Specify `earliest` to
consume from the start commit |
+| `write.end-commit` | `false` | the latest commit | -- |
## Hive Query
diff --git a/website/static/assets/images/cdc-2-hudi.png
b/website/static/assets/images/cdc-2-hudi.png
new file mode 100644
index 0000000..4fa68cf
Binary files /dev/null and b/website/static/assets/images/cdc-2-hudi.png differ