This is an automated email from the ASF dual-hosted git repository.
bhavanisudha pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new cdcc927 organized the Flink details to move into the relevant pages
and make the quick-start short and simple (#4257)
cdcc927 is described below
commit cdcc927f4fcc7ea0cc3f4b51e37adf12c6deaed6
Author: Kyle Weller <[email protected]>
AuthorDate: Thu Dec 9 16:25:45 2021 -0800
organized the Flink details to move into the relevant pages and make the
quick-start short and simple (#4257)
---
website/docs/compaction.md | 67 ++++--
website/docs/flink-quick-start-guide.md | 415 +-------------------------------
website/docs/flink_configuration.md | 117 +++++++++
website/docs/hoodie_deltastreamer.md | 145 ++++++++++-
website/docs/query_engine_setup.md | 19 +-
website/docs/syncing_metastore.md | 100 +++++++-
website/sidebars.js | 1 +
7 files changed, 433 insertions(+), 431 deletions(-)
diff --git a/website/docs/compaction.md b/website/docs/compaction.md
index 70fba50..015d21e 100644
--- a/website/docs/compaction.md
+++ b/website/docs/compaction.md
@@ -5,18 +5,13 @@ toc: true
last_modified_at:
---
-Compaction is executed asynchronously with Hudi by default.
-
## Async Compaction
-
-Async Compaction is performed in 2 steps:
+Compaction is executed asynchronously with Hudi by default. Async Compaction
is performed in 2 steps:
1. ***Compaction Scheduling***: This is done by the ingestion job. In this
step, Hudi scans the partitions and selects **file
slices** to be compacted. A compaction plan is finally written to Hudi
timeline.
1. ***Compaction Execution***: A separate process reads the compaction plan
and performs compaction of file slices.
-## Deployment Models
-
There are few ways by which we can execute compactions asynchronously.
### Spark Structured Streaming
@@ -68,6 +63,37 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--continous
```
+## Synchronous Compaction
+By default, compaction is run asynchronously.
+
+If latency of ingesting records is important for you, you are most likely
using Merge-On-Read tables.
+Merge-On-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
+Updates are logged to delta files & later compacted to produce new versions of
columnar files.
+To improve ingestion latency, Async Compaction is the default configuration.
+
+If immediate read performance of a new commit is important for you, or you
want simplicity of not managing separate compaction jobs,
+you may want Synchronous compaction, which means that as a commit is written
it is also compacted by the same job.
+
+Compaction is run synchronously by passing the flag "--disable-compaction"
(Meaning to disable async compaction scheduling).
+When both ingestion and compaction is running in the same spark context, you
can use resource allocation configuration
+in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight",
+"--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and
"--compact-scheduling-minshare")
+to control executor allocation between ingestion and compaction.
+
+
+## Offline Compaction
+
+The compaction of the MERGE_ON_READ table is enabled by default. The trigger
strategy is to perform compaction after completing
+five commits. Because compaction consumes a lot of memory and is placed in the
same pipeline with the write operation, it's easy to
+interfere with the write operation when there is a large amount of data (>
100000 per second). As this time, it is more stable to execute
+the compaction task by using offline compaction.
+
+:::note
+The execution of a compaction task includes two parts: schedule compaction
plan and execute compaction plan. It's recommended that
+the process of schedule compaction plan be triggered periodically by the write
task, and the write parameter `compaction.schedule.enable`
+is enabled by default.
+:::
+
### Hudi Compactor Utility
Hudi provides a standalone tool to execute specific compactions
asynchronously. Below is an example and you can read more in the [deployment
guide](/docs/deployment#compactions)
@@ -81,7 +107,7 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--instant-time <compaction_instant>
```
-Note, the `instant-time` parameter is now optional for the Hudi Compactor
Utility. If using the utility without `--instant time`,
+Note, the `instant-time` parameter is now optional for the Hudi Compactor
Utility. If using the utility without `--instant time`,
the spark-submit will execute the earliest scheduled compaction on the Hudi
timeline.
### Hudi CLI
@@ -93,19 +119,20 @@ hudi:trips->compaction run --tableName <table_name>
--parallelism <parallelism>
...
```
-## Synchronous Compaction
-By default, compaction is run asynchronously.
+### Flink Offline Compaction
+Offline compaction needs to submit the Flink task on the command line. The
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
+`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
-If latency of ingesting records is important for you, you are most likely
using Merge-On-Read tables.
-Merge-On-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
-Updates are logged to delta files & later compacted to produce new versions of
columnar files.
-To improve ingestion latency, Async Compaction is the default configuration.
+```bash
+# Command line
+./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor
lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
+```
-If immediate read performance of a new commit is important for you, or you
want simplicity of not managing separate compaction jobs,
-you may want Synchronous compaction, which means that as a commit is written
it is also compacted by the same job.
+#### Options
-Compaction is run synchronously by passing the flag "--disable-compaction"
(Meaning to disable async compaction scheduling).
-When both ingestion and compaction is running in the same spark context, you
can use resource allocation configuration
-in DeltaStreamer CLI such as ("--delta-sync-scheduling-weight",
-"--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and
"--compact-scheduling-minshare")
-to control executor allocation between ingestion and compaction.
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `--path` | `frue` | `--` | The path where the target table is stored on Hudi
|
+| `--compaction-max-memory` | `false` | `100` | The index map size of log data
during compaction, 100 MB by default. If you have enough memory, you can turn
up this parameter |
+| `--schedule` | `false` | `false` | whether to execute the operation of
scheduling compaction plan. When the write process is still writing, turning on
this parameter have a risk of losing data. Therefore, it must be ensured that
there are no write tasks currently writing data to this table when this
parameter is turned on |
+| `--seq` | `false` | `LIFO` | The order in which compaction tasks are
executed. Executing from the latest compaction plan by default. `LIFO`:
executing from the latest plan. `FIFO`: executing from the oldest plan. |
\ No newline at end of file
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index 7ba0423..2feeba6 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -164,416 +164,19 @@ When consuming data in streaming query, Hudi Flink
source can also accepts the c
it can then applies the UPDATE and DELETE by per-row level. You can then sync
a NEAR-REAL-TIME snapshot on Hudi for all kinds
of RDBMS.
-## Flink Configuration
-
-Before using Flink, you need to set some global configurations in
`$FLINK_HOME/conf/flink-conf.yaml`
-
-### Parallelism
-
-| Option Name | Default | Type | Description |
-| ----------- | ------- | ------- | ------- |
-| `taskmanager.numberOfTaskSlots` | `1` | `Integer` | The number of parallel
operator or user function instances that a single TaskManager can run. We
recommend setting this value > 4, and the actual value needs to be set
according to the amount of data |
-| `parallelism.default` | `1` | `Integer` | The default parallelism used when
no parallelism is specified anywhere (default: 1). For example, If the value of
[`write.bucket_assign.tasks`](#parallelism-1) is not set, this value will be
used |
-
-### Memory
-
-| Option Name | Default | Type | Description |
-| ----------- | ------- | ------- | ------- |
-| `jobmanager.memory.process.size` | `(none)` | `MemorySize` | Total Process
Memory size for the JobManager. This includes all the memory that a JobManager
JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM
Overhead |
-| `taskmanager.memory.task.heap.size` | `(none)` | `MemorySize` | Task Heap
Memory size for TaskExecutors. This is the size of JVM heap memory reserved for
write cache |
-| `taskmanager.memory.managed.size` | `(none)` | `MemorySize` | Managed
Memory size for TaskExecutors. This is the size of off-heap memory managed by
the memory manager, reserved for sorting and RocksDB state backend. If you
choose RocksDB as the state backend, you need to set this memory |
-
-### Checkpoint
-
-| Option Name | Default | Type | Description |
-| ----------- | ------- | ------- | ------- |
-| `execution.checkpointing.interval` | `(none)` | `Duration` | Setting this
value as `execution.checkpointing.interval = 150000ms`, 150000ms = 2.5min.
Configuring this parameter is equivalent to enabling the checkpoint |
-| `state.backend` | `(none)` | `String` | The state backend to be used to
store state. We recommend setting store state as `rocksdb` : `state.backend:
rocksdb` |
-| `state.backend.rocksdb.localdir` | `(none)` | `String` | The local directory
(on the TaskManager) where RocksDB puts its files |
-| `state.checkpoints.dir` | `(none)` | `String` | The default directory used
for storing the data files and meta data of checkpoints in a Flink supported
filesystem. The storage path must be accessible from all participating
processes/nodes(i.e. all TaskManagers and JobManagers), like hdfs and oss path |
-| `state.backend.incremental` | `false` | `Boolean` | Option whether the
state backend should create incremental checkpoints, if possible. For an
incremental checkpoint, only a diff from the previous checkpoint is stored,
rather than the complete checkpoint state. If store state is setting as
`rocksdb`, recommending to turn on |
-
-## Table Option
-
-Flink SQL job can be configured through the options in [`WITH`](#table-option)
clause.
-The actual datasource level configs are listed below.
-
-### Memory
-
-:::note
-When optimizing memory, we need to pay attention to the [memory
configuration](#memory)
-and the number of taskManagers, parallelism of write tasks (write.tasks : 4)
first. After confirm each write task to be
-allocated with enough memory, we can try to set these memory options.
-:::
-
-| Option Name | Description | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.task.max.size` | Maximum memory in MB for a write task, when the
threshold hits, it flushes the max size data bucket to avoid OOM. Default
`1024MB` | `1024D` | The memory reserved for write buffer is
`write.task.max.size` - `compaction.max_memory`. When total buffer of write
tasks reach the threshold, the largest buffer in the memory will be flushed |
-| `write.batch.size` | In order to improve the efficiency of writing, Flink
write task will cache data in buffer according to the write bucket until the
memory reaches the threshold. When reached threshold, the data buffer would be
flushed out. Default `64MB` | `64D` | Recommend to use the default settings |
-| `write.log_block.size` | The log writer of Hudi will not flush the data
immediately after receiving data. The writer flush data to the disk in the unit
of `LogBlock`. Before `LogBlock` reached threshold, records will be buffered in
the writer in form of serialized bytes. Default `128MB` | `128` | Recommend
to use the default settings |
-| `write.merge.max_memory` | If write type is `COPY_ON_WRITE`, Hudi will merge
the incremental data and base file data. The incremental data will be cached
and spilled to disk. this threshold controls the max heap size that can be
used. Default `100MB` | `100` | Recommend to use the default settings |
-| `compaction.max_memory` | Same as `write.merge.max_memory`, but occurs
during compaction. Default `100MB` | `100` | If it is online compaction, it can
be turned up when resources are sufficient, such as setting as `1024MB` |
-
-### Parallelism
-
-| Option Name | Description | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.tasks` | The parallelism of writer tasks. Each write task writes 1
to `N` buckets in sequence. Default `4` | `4` | Increases the parallelism has
no effect on the number of small files |
-| `write.bucket_assign.tasks` | The parallelism of bucket assigner
operators. No default value, using Flink `parallelism.default` |
[`parallelism.default`](#parallelism) | Increases the parallelism also
increases the number of buckets, thus the number of small files (small buckets)
|
-| `write.index_boostrap.tasks` | The parallelism of index bootstrap.
Increasing parallelism can speed up the efficiency of the bootstrap stage. The
bootstrap stage will block checkpointing. Therefore, it is necessary to set
more checkpoint failure tolerance times. Default using Flink
`parallelism.default` | [`parallelism.default`](#parallelism) | It only take
effect when `index.bootsrap.enabled` is `true` |
-| `read.tasks` | The parallelism of read operators (batch and stream). Default
`4` | `4` | |
-| `compaction.tasks` | The parallelism of online compaction. Default `4` | `4`
| `Online compaction` will occupy the resources of the write task. It is
recommended to use [`offline compaction`](#offline-compaction) |
-
-### Compaction
-
-:::note
-These are options only for `online compaction`.
-:::
-
-:::note
-Turn off online compaction by setting `compaction.async.enabled` = `false`,
but we still recommend turning on `compaction.schedule.enable` for the writing
job. You can then execute the compaction plan by [`offline
compaction`](#offline-compaction).
-:::
-
-| Option Name | Description | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `compaction.schedule.enabled` | Whether to generate compaction plan
periodically | `true` | Recommend to turn it on, even if
`compaction.async.enabled` = `false` |
-| `compaction.async.enabled` | Async Compaction, enabled by default for MOR
| `true` | Turn off `online compaction` by turning off this option |
-| `compaction.trigger.strategy` | Strategy to trigger compaction |
`num_commits` | Options are `num_commits`: trigger compaction when reach N
delta commits; `time_elapsed`: trigger compaction when time elapsed > N seconds
since last compaction; `num_and_time`: trigger compaction when both
`NUM_COMMITS` and `TIME_ELAPSED` are satisfied; `num_or_time`: trigger
compaction when `NUM_COMMITS` or `TIME_ELAPSED` is satisfied. |
-| `compaction.delta_commits` | Max delta commits needed to trigger compaction,
default `5` commits | `5` | -- |
-| `compaction.delta_seconds` | Max delta seconds time needed to trigger
compaction, default `1` hour | `3600` | -- |
-| `compaction.max_memory` | Max memory in MB for compaction spillable map,
default `100MB` | `100` | If your have sufficient resources, recommend to
adjust to `1024MB` |
-| `compaction.target_io` | Target IO per compaction (both read and write),
default `500GB`| `512000` | -- |
-
-## Memory Optimization
-
-### MOR
-
-1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in
memory` state backend is very memory intensive).
-2. If there is enough memory, `compaction.max_memory` can be set larger
(`100MB` by default, and can be adjust to `1024MB`).
-3. Pay attention to the memory allocated to each write task by taskManager to
ensure that each write task can be allocated to the
-desired memory size `write.task.max.size`. For example, taskManager has `4GB`
of memory running two streamWriteFunction, so each write task
-can be allocated with `2GB` memory. Please reserve some buffers because the
network buffer and other types of tasks on taskManager (such as
bucketAssignFunction) will also consume memory.
-4. Pay attention to the memory changes of compaction. `compaction.max_memory`
controls the maximum memory that each task can be used when compaction tasks
read
-logs. `compaction.tasks` controls the parallelism of compaction tasks.
-
-### COW
-
-1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in
memory` state backend is very memory intensive).
-2. Increase both `write.task.max.size` and `write.merge.max_memory` (`1024MB`
and `100MB` by default, adjust to `2014MB` and `1024MB`).
-3. Pay attention to the memory allocated to each write task by taskManager to
ensure that each write task can be allocated to the
-desired memory size `write.task.max.size`. For example, taskManager has `4GB`
of memory running two write tasks, so each write task
-can be allocated with `2GB` memory. Please reserve some buffers because the
network buffer and other types of tasks on taskManager (such as
`BucketAssignFunction`) will also consume memory.
-
-## CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
-We recommend two ways for syncing CDC data into Hudi:
-
-
-
-1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
-The advantage is that it does not rely on message queues, but the disadvantage
is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
-but the disadvantage is that it relies on message queues.
-
-:::note
-- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
-- The MOR table can not handle DELETEs in event time sequence now, thus
causing data loss. You better switch on the changelog mode through
-option `changelog.enabled`.
-:::
-
-## Bulk Insert
-
-For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
-import the snapshot data into Hudi.
-
-
-:::note
- `bulk_insert` eliminates the serialization and data merging. The data
deduplication is skipped, so the user need to guarantee the uniqueness of the
data.
-:::
-
-:::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write
performance degradation caused by
-frequent `file handle` switching.
-:::
-
-:::note
-The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >=
[`write.bucket_assign.tasks`](#parallelism)).
-:::
-
-### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open
this function |
-| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the
number of files` >= [`write.bucket_assign.tasks`](#parallelism) |
-| `write.bulk_insert.shuffle_by_partition` | `false` | `true` | Whether to
shuffle data according to the partition field before writing. Enabling this
option will reduce the number of small files, but there may be a risk of data
skew |
-| `write.bulk_insert.sort_by_partition` | `false` | `true` | Whether to sort
data according to the partition field before writing. Enabling this option will
reduce the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort
operator. default `128` MB |
-
-## Index Bootstrap
-
-For the demand of `snapshot data` + `incremental data` import. If the
`snapshot data` already insert into Hudi by [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not
repeated by using the index bootstrap function.
-
-:::note
-If you think this process is very time-consuming, you can add resources to
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate
limit function).
-:::
-
-### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remain records in Hudi table will be loaded into the Flink state
at one time |
-| `index.partition.regex` | `false` | `*` | Optimize option. Setting
regular expressions to filter partitions. By default, all partitions are loaded
into flink state |
-
-### How To Use
-
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` :
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
-
-:::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish
loading the index under partition` and `Load record form file` to observe the
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap
completed. There is no need to load the index again when recovering from the
checkpoint.
-:::
-
-## Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores
messages in the forms of rows, which supports the retention of all change logs
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
-
-### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
-
-:::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
-:::
-
-:::note
-After setting `changelog.enable` as `true`, the retention of changelog records
are only best effort: the asynchronous compaction task will merge the changelog
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can
be read after compaction. The solution is to reserve some buffer time for the
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and
[`compaction.delta_seconds`](#compaction).
-:::
-
-
-## Append Mode
-
-If INSERT operation is used for ingestion, for COW table, there is no merging
of small files by default; for MOR table, the small file strategy is applied
always: MOR appends delta records to log files.
-
-The small file strategy lead to performance degradation. If you want to apply
the behavior of file merge for COW table, turns on option
`write.insert.cluster`, there is no record key combining by the way.
-
-### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
-
-## Rate Limit
-There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. At this time, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
-
-### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
-
-## Incremental Query
-There are 3 use cases for incremental query:
-1. Streaming query: specify the start commit with option `read.start-commit`;
-2. Batch query: specify the start commit with option `read.start-commit` and
end commit with option `read.end-commit`,
-the interval is a closed one: both start commit and end commit are inclusive;
-3. TimeTravel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
-
-### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.start-commit` | `false` | the latest commit | Specify `earliest` to
consume from the start commit |
-| `write.end-commit` | `false` | the latest commit | -- |
-
-## Hive Query
-
-### Install
-
-Now you can git clone Hudi master branch to test Flink hive sync. The first
step is to install Hudi to get `hudi-flink-bundle_2.11-0.x.jar`.
-`hudi-flink-bundle` module pom.xml sets the scope related to hive as
`provided` by default. If you want to use hive sync, you need to use the
-profile `flink-bundle-shade-hive` during packaging. Executing command below to
install:
-
-```bash
-# Maven install command
-mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
-
-# For hive1, you need to use profile -Pflink-bundle-shade-hive1
-# For hive3, you need to use profile -Pflink-bundle-shade-hive3
-```
-
-:::note
-Hive1.x can only synchronize metadata to hive, but cannot use hive query now.
If you need to query, you can use spark to query hive table.
-:::
-
-:::note
-If using hive profile, you need to modify the hive version in the profile to
your hive cluster version (Only need to modify the hive version in this
profile).
-The location of this `pom.xml` is `packaging/hudi-flink-bundle/pom.xml`, and
the corresponding profile is at the bottom of this file.
-:::
-
-### Hive Environment
-
-1. Import `hudi-hadoop-mr-bundle` into hive. Creating `auxlib/` folder under
the root directory of hive, and moving
`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` into `auxlib`.
-`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` is at
`packaging/hudi-hadoop-mr-bundle/target`.
-
-2. When Flink sql client connects hive metastore remotely, `hive metastore`
and `hiveserver2` services need to be enabled, and the port number need to
-be set correctly. Command to turn on the services:
-
-```bash
-# Enable hive metastore and hiveserver2
-nohup ./bin/hive --service metastore &
-nohup ./bin/hive --service hiveserver2 &
-
-# While modifying the jar package under auxlib, you need to restart the
service.
-```
-
-### Sync Template
-
-Flink hive sync now supports two hive sync mode, `hms` and `jdbc`. `hms` mode
only needs to configure metastore uris. For
-the `jdbc` mode, the JDBC attributes and metastore uris both need to be
configured. The options template is as below:
-
-```sql
--- hms mode template
-CREATE TABLE t1(
- uuid VARCHAR(20),
- name VARCHAR(10),
- age INT,
- ts TIMESTAMP(3),
- `partition` VARCHAR(20)
-)
-PARTITIONED BY (`partition`)
-WITH (
- 'connector' = 'hudi',
- 'path' = '${db_path}/t1',
- 'table.type' = 'COPY_ON_WRITE', --If MERGE_ON_READ, hive query will not
have output until the parquet file is generated
- 'hive_sync.enable' = 'true', -- Required. To enable hive synchronization
- 'hive_sync.mode' = 'hms' -- Required. Setting hive sync mode to hms,
default jdbc
- 'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port
need set on hive-site.xml
-);
-
-
--- jdbc mode template
-CREATE TABLE t1(
- uuid VARCHAR(20),
- name VARCHAR(10),
- age INT,
- ts TIMESTAMP(3),
- `partition` VARCHAR(20)
-)
-PARTITIONED BY (`partition`)
-WITH (
- 'connector' = 'hudi',
- 'path' = '${db_path}/t1',
- 'table.type' = 'COPY_ON_WRITE', --If MERGE_ON_READ, hive query will not
have output until the parquet file is generated
- 'hive_sync.enable' = 'true', -- Required. To enable hive synchronization
- 'hive_sync.mode' = 'hms' -- Required. Setting hive sync mode to hms,
default jdbc
- 'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port
need set on hive-site.xml
- 'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000', -- required, hiveServer
port
- 'hive_sync.table'='t1', -- required, hive table name
- 'hive_sync.db'='testDB', -- required, hive database
name
- 'hive_sync.username'='${user_name}', -- required, HMS
username
- 'hive_sync.password'='${password}' -- required, HMS password
-);
-```
-
-### Query
-
-While using hive beeline query, you need to enter settings:
-```bash
-set hive.input.format =
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
-```
-
-## Presto Query
-
-### Hive Sync
-First, you need to sync Hudi table metadata to hive according to the above
steps of [Hive Query](#hive-query).
-
-### Presto Environment
-1. Configure Presto according to the [Presto configuration
document](https://prestodb.io/docs/current/installation/deployment.html).
-2. Configure hive catalog in `
/presto-server-0.2xxx/etc/catalog/hive.properties` as follows:
-
-```properties
-connector.name=hive-hadoop2
-hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083
-hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xml
-```
-
-### Query
-
-Beginning query by connecting hive metastore with presto client. The presto
client connection command is as follows:
-
-```bash
-# The presto client connection command
-./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema default
-```
-
-:::note
-1. Presto-server-0.2445 is a lower version. When querying the `rt table` of
MERGE_ON_WRITE, there will be a package conflict, this bug is in fix.
-2. When Presto-server-xxx version < 0.233, the `hudi-presto-bundle.jar` needs
to manually import into `{presto_install_dir}/plugin/hive-hadoop2/`.
-:::
-
-
-## Offline Compaction
-
-The compaction of the MERGE_ON_READ table is enabled by default. The trigger
strategy is to perform compaction after completing
-five commits. Because compaction consumes a lot of memory and is placed in the
same pipeline with the write operation, it's easy to
-interfere with the write operation when there is a large amount of data (>
100000 per second). As this time, it is more stable to execute
-the compaction task by using offline compaction.
-
-:::note
-The execution of a compaction task includes two parts: schedule compaction
plan and execute compaction plan. It's recommended that
-the process of schedule compaction plan be triggered periodically by the write
task, and the write parameter `compaction.schedule.enable`
-is enabled by default.
-:::
-
-Offline compaction needs to submit the Flink task on the command line. The
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
-`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
-
-```bash
-# Command line
-./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor
lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table
-```
-
-### Options
-
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `--path` | `frue` | `--` | The path where the target table is stored on Hudi
|
-| `--compaction-max-memory` | `false` | `100` | The index map size of log data
during compaction, 100 MB by default. If you have enough memory, you can turn
up this parameter |
-| `--schedule` | `false` | `false` | whether to execute the operation of
scheduling compaction plan. When the write process is still writing, turning on
this parameter have a risk of losing data. Therefore, it must be ensured that
there are no write tasks currently writing data to this table when this
parameter is turned on |
-| `--seq` | `false` | `LIFO` | The order in which compaction tasks are
executed. Executing from the latest compaction plan by default. `LIFO`:
executing from the latest plan. `FIFO`: executing from the oldest plan. |
-
-## Write Rate Limit
-
-In the existing data synchronization, `snapshot data` and `incremental data`
are send to kafka first, and then streaming write
-to Hudi by Flink. Because the direct consumption of `snapshot data` will lead
to problems such as high throughput and serious
-disorder (writing partition randomly), which will lead to write performance
degradation and throughput glitches. At this time,
-the `write.rate.limit` option can be turned on to ensure smooth writing.
+## Where To Go From Here?
+Check out the [Flink Setup](/docs/next/flink_configuration) how-to page for
deeper dive into configuration settings.
-### Options
+If you are relatively new to Apache Hudi, it is important to be familiar with
a few core concepts:
+ - [Hudi Timeline](/docs/next/timeline) – How Hudi manages transactions and
other table services
+ - [Hudi File Layout](/docs/next/file_layouts) - How the files are laid out
on storage
+ - [Hudi Table Types](/docs/next/table_types) – `COPY_ON_WRITE` and
`MERGE_ON_READ`
+ - [Hudi Query Types](/docs/next/table_types#query-types) – Snapshot Queries,
Incremental Queries, Read-Optimized Queries
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Turn off by default |
+See more in the "Concepts" section of the docs.
-## Where To Go From Here?
+Take a look at recent [blog posts](/blog) that go in depth on certain topics
or use cases.
-We used Flink here to show case the capabilities of Hudi. However, Hudi can
support multiple table types/query types and
Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto
and much more. We have put together a
[demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that show cases all
of this on a docker based setup with all
dependent systems running locally. We recommend you replicate the same setup
and run the demo yourself, by following
diff --git a/website/docs/flink_configuration.md
b/website/docs/flink_configuration.md
new file mode 100644
index 0000000..ba7853d
--- /dev/null
+++ b/website/docs/flink_configuration.md
@@ -0,0 +1,117 @@
+---
+title: Flink Setup
+toc: true
+---
+
+## Global Configurations
+When using Flink, you can set some global configurations in
`$FLINK_HOME/conf/flink-conf.yaml`
+
+### Parallelism
+
+| Option Name | Default | Type | Description |
+| ----------- | ------- | ------- | ------- |
+| `taskmanager.numberOfTaskSlots` | `1` | `Integer` | The number of parallel
operator or user function instances that a single TaskManager can run. We
recommend setting this value > 4, and the actual value needs to be set
according to the amount of data |
+| `parallelism.default` | `1` | `Integer` | The default parallelism used when
no parallelism is specified anywhere (default: 1). For example, If the value of
[`write.bucket_assign.tasks`](#parallelism-1) is not set, this value will be
used |
+
+### Memory
+
+| Option Name | Default | Type | Description |
+| ----------- | ------- | ------- | ------- |
+| `jobmanager.memory.process.size` | `(none)` | `MemorySize` | Total Process
Memory size for the JobManager. This includes all the memory that a JobManager
JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM
Overhead |
+| `taskmanager.memory.task.heap.size` | `(none)` | `MemorySize` | Task Heap
Memory size for TaskExecutors. This is the size of JVM heap memory reserved for
write cache |
+| `taskmanager.memory.managed.size` | `(none)` | `MemorySize` | Managed
Memory size for TaskExecutors. This is the size of off-heap memory managed by
the memory manager, reserved for sorting and RocksDB state backend. If you
choose RocksDB as the state backend, you need to set this memory |
+
+### Checkpoint
+
+| Option Name | Default | Type | Description |
+| ----------- | ------- | ------- | ------- |
+| `execution.checkpointing.interval` | `(none)` | `Duration` | Setting this
value as `execution.checkpointing.interval = 150000ms`, 150000ms = 2.5min.
Configuring this parameter is equivalent to enabling the checkpoint |
+| `state.backend` | `(none)` | `String` | The state backend to be used to
store state. We recommend setting store state as `rocksdb` : `state.backend:
rocksdb` |
+| `state.backend.rocksdb.localdir` | `(none)` | `String` | The local directory
(on the TaskManager) where RocksDB puts its files |
+| `state.checkpoints.dir` | `(none)` | `String` | The default directory used
for storing the data files and meta data of checkpoints in a Flink supported
filesystem. The storage path must be accessible from all participating
processes/nodes(i.e. all TaskManagers and JobManagers), like hdfs and oss path |
+| `state.backend.incremental` | `false` | `Boolean` | Option whether the
state backend should create incremental checkpoints, if possible. For an
incremental checkpoint, only a diff from the previous checkpoint is stored,
rather than the complete checkpoint state. If store state is setting as
`rocksdb`, recommending to turn on |
+
+## Table Options
+
+Flink SQL jobs can be configured through options in the `WITH` clause.
+The actual datasource level configs are listed below.
+
+### Memory
+
+:::note
+When optimizing memory, we need to pay attention to the memory configuration
+and the number of taskManagers, parallelism of write tasks (write.tasks : 4)
first. After confirm each write task to be
+allocated with enough memory, we can try to set these memory options.
+:::
+
+| Option Name | Description | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.task.max.size` | Maximum memory in MB for a write task, when the
threshold hits, it flushes the max size data bucket to avoid OOM. Default
`1024MB` | `1024D` | The memory reserved for write buffer is
`write.task.max.size` - `compaction.max_memory`. When total buffer of write
tasks reach the threshold, the largest buffer in the memory will be flushed |
+| `write.batch.size` | In order to improve the efficiency of writing, Flink
write task will cache data in buffer according to the write bucket until the
memory reaches the threshold. When reached threshold, the data buffer would be
flushed out. Default `64MB` | `64D` | Recommend to use the default settings |
+| `write.log_block.size` | The log writer of Hudi will not flush the data
immediately after receiving data. The writer flush data to the disk in the unit
of `LogBlock`. Before `LogBlock` reached threshold, records will be buffered in
the writer in form of serialized bytes. Default `128MB` | `128` | Recommend
to use the default settings |
+| `write.merge.max_memory` | If write type is `COPY_ON_WRITE`, Hudi will merge
the incremental data and base file data. The incremental data will be cached
and spilled to disk. this threshold controls the max heap size that can be
used. Default `100MB` | `100` | Recommend to use the default settings |
+| `compaction.max_memory` | Same as `write.merge.max_memory`, but occurs
during compaction. Default `100MB` | `100` | If it is online compaction, it can
be turned up when resources are sufficient, such as setting as `1024MB` |
+
+### Parallelism
+
+| Option Name | Description | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.tasks` | The parallelism of writer tasks. Each write task writes 1
to `N` buckets in sequence. Default `4` | `4` | Increases the parallelism has
no effect on the number of small files |
+| `write.bucket_assign.tasks` | The parallelism of bucket assigner
operators. No default value, using Flink `parallelism.default` |
[`parallelism.default`](#parallelism) | Increases the parallelism also
increases the number of buckets, thus the number of small files (small buckets)
|
+| `write.index_boostrap.tasks` | The parallelism of index bootstrap.
Increasing parallelism can speed up the efficiency of the bootstrap stage. The
bootstrap stage will block checkpointing. Therefore, it is necessary to set
more checkpoint failure tolerance times. Default using Flink
`parallelism.default` | [`parallelism.default`](#parallelism) | It only take
effect when `index.bootsrap.enabled` is `true` |
+| `read.tasks` | The parallelism of read operators (batch and stream). Default
`4` | `4` | |
+| `compaction.tasks` | The parallelism of online compaction. Default `4` | `4`
| `Online compaction` will occupy the resources of the write task. It is
recommended to use [`offline compaction`](#offline-compaction) |
+
+### Compaction
+
+:::note
+These are options only for `online compaction`.
+:::
+
+:::note
+Turn off online compaction by setting `compaction.async.enabled` = `false`,
but we still recommend turning on `compaction.schedule.enable` for the writing
job. You can then execute the compaction plan by [`offline
compaction`](#offline-compaction).
+:::
+
+| Option Name | Description | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `compaction.schedule.enabled` | Whether to generate compaction plan
periodically | `true` | Recommend to turn it on, even if
`compaction.async.enabled` = `false` |
+| `compaction.async.enabled` | Async Compaction, enabled by default for MOR
| `true` | Turn off `online compaction` by turning off this option |
+| `compaction.trigger.strategy` | Strategy to trigger compaction |
`num_commits` | Options are `num_commits`: trigger compaction when reach N
delta commits; `time_elapsed`: trigger compaction when time elapsed > N seconds
since last compaction; `num_and_time`: trigger compaction when both
`NUM_COMMITS` and `TIME_ELAPSED` are satisfied; `num_or_time`: trigger
compaction when `NUM_COMMITS` or `TIME_ELAPSED` is satisfied. |
+| `compaction.delta_commits` | Max delta commits needed to trigger compaction,
default `5` commits | `5` | -- |
+| `compaction.delta_seconds` | Max delta seconds time needed to trigger
compaction, default `1` hour | `3600` | -- |
+| `compaction.max_memory` | Max memory in MB for compaction spillable map,
default `100MB` | `100` | If your have sufficient resources, recommend to
adjust to `1024MB` |
+| `compaction.target_io` | Target IO per compaction (both read and write),
default `500GB`| `512000` | -- |
+
+## Memory Optimization
+
+### MOR
+
+1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in
memory` state backend is very memory intensive).
+2. If there is enough memory, `compaction.max_memory` can be set larger
(`100MB` by default, and can be adjust to `1024MB`).
+3. Pay attention to the memory allocated to each write task by taskManager to
ensure that each write task can be allocated to the
+ desired memory size `write.task.max.size`. For example, taskManager has
`4GB` of memory running two streamWriteFunction, so each write task
+ can be allocated with `2GB` memory. Please reserve some buffers because the
network buffer and other types of tasks on taskManager (such as
bucketAssignFunction) will also consume memory.
+4. Pay attention to the memory changes of compaction. `compaction.max_memory`
controls the maximum memory that each task can be used when compaction tasks
read
+ logs. `compaction.tasks` controls the parallelism of compaction tasks.
+
+### COW
+
+1. [Setting Flink state backend to `rocksdb`](#checkpoint) (the default `in
memory` state backend is very memory intensive).
+2. Increase both `write.task.max.size` and `write.merge.max_memory` (`1024MB`
and `100MB` by default, adjust to `2014MB` and `1024MB`).
+3. Pay attention to the memory allocated to each write task by taskManager to
ensure that each write task can be allocated to the
+ desired memory size `write.task.max.size`. For example, taskManager has
`4GB` of memory running two write tasks, so each write task
+ can be allocated with `2GB` memory. Please reserve some buffers because the
network buffer and other types of tasks on taskManager (such as
`BucketAssignFunction`) will also consume memory.
+
+
+## Write Rate Limit
+
+In the existing data synchronization, `snapshot data` and `incremental data`
are send to kafka first, and then streaming write
+to Hudi by Flink. Because the direct consumption of `snapshot data` will lead
to problems such as high throughput and serious
+disorder (writing partition randomly), which will lead to write performance
degradation and throughput glitches. At this time,
+the `write.rate.limit` option can be turned on to ensure smooth writing.
+
+### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Turn off by default |
\ No newline at end of file
diff --git a/website/docs/hoodie_deltastreamer.md
b/website/docs/hoodie_deltastreamer.md
index 615ff94..3129593 100644
--- a/website/docs/hoodie_deltastreamer.md
+++ b/website/docs/hoodie_deltastreamer.md
@@ -151,7 +151,7 @@ and then ingest it as follows.
In some cases, you may want to migrate your existing table into Hudi
beforehand. Please refer to [migration guide](/docs/migration_guide).
-## MultiTableDeltaStreamer
+### MultiTableDeltaStreamer
`HoodieMultiTableDeltaStreamer`, a wrapper on top of `HoodieDeltaStreamer`,
enables one to ingest multiple tables at a single go into hudi datasets.
Currently it only supports sequential processing of tables to be ingested and
COPY_ON_WRITE storage type. The command line options for
`HoodieMultiTableDeltaStreamer` are pretty much similar to
`HoodieDeltaStreamer` with the only exception that you are required to provide
table wise configs in separate files in a dedicated config folder. The [...]
@@ -189,7 +189,7 @@ Sample config files for table wise overridden properties
can be found under `hud
For detailed information on how to configure and use
`HoodieMultiTableDeltaStreamer`, please refer [blog
section](/blog/2020/08/22/ingest-multiple-tables-using-hudi).
-## Concurrency Control
+### Concurrency Control
The `HoodieDeltaStreamer` utility (part of hudi-utilities-bundle) provides
ways to ingest from different sources such as DFS or Kafka, with the following
capabilities.
@@ -336,7 +336,146 @@ jobs: `hoodie.write.meta.key.prefixes =
'deltastreamer.checkpoint.key'`
Spark SQL should be configured using this hoodie config:
hoodie.deltastreamer.source.sql.sql.query = 'select * from source_table'
-## Hudi Kafka Connect Sink
+## Flink Ingestion
+
+### CDC Ingestion
+CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
+We recommend two ways for syncing CDC data into Hudi:
+
+
+
+1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
+ The advantage is that it does not rely on message queues, but the
disadvantage is that it puts pressure on the db server;
+2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
+ but the disadvantage is that it relies on message queues.
+
+:::note
+- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
+- The MOR table can not handle DELETEs in event time sequence now, thus
causing data loss. You better switch on the changelog mode through
+ option `changelog.enabled`.
+ :::
+
+### Bulk Insert
+
+For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
+import the snapshot data into Hudi.
+
+
+:::note
+`bulk_insert` eliminates the serialization and data merging. The data
deduplication is skipped, so the user need to guarantee the uniqueness of the
data.
+:::
+
+:::note
+`bulk_insert` is more efficient in the `batch execution mode`. By default, the
`batch execution mode` sorts the input records
+by the partition path and writes these records to Hudi, which can avoid write
performance degradation caused by
+frequent `file handle` switching.
+:::
+
+:::note
+The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism will affect the number of small files.
+In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In
particular, when each bucket writes to maximum file size, it
+will rollover to the new file handle. Finally, `the number of files` >=
[`write.bucket_assign.tasks`](#parallelism)).
+:::
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open
this function |
+| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the
number of files` >= [`write.bucket_assign.tasks`](#parallelism) |
+| `write.bulk_insert.shuffle_by_partition` | `false` | `true` | Whether to
shuffle data according to the partition field before writing. Enabling this
option will reduce the number of small files, but there may be a risk of data
skew |
+| `write.bulk_insert.sort_by_partition` | `false` | `true` | Whether to sort
data according to the partition field before writing. Enabling this option will
reduce the number of small files when a write task writes multiple partitions |
+| `write.sort.memory` | `false` | `128` | Available managed memory of sort
operator. default `128` MB |
+
+### Index Bootstrap
+
+For the demand of `snapshot data` + `incremental data` import. If the
`snapshot data` already insert into Hudi by [bulk insert](#bulk-insert).
+User can insert `incremental data` in real time and ensure the data is not
repeated by using the index bootstrap function.
+
+:::note
+If you think this process is very time-consuming, you can add resources to
write in streaming mode while writing `snapshot data`,
+and then reduce the resources to write `incremental data` (or open the rate
limit function).
+:::
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remain records in Hudi table will be loaded into the Flink state
at one time |
+| `index.partition.regex` | `false` | `*` | Optimize option. Setting
regular expressions to filter partitions. By default, all partitions are loaded
into flink state |
+
+#### How To Use
+
+1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note
that the `table.type` must be correct.
+2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
+3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` :
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
+4. Waiting until the first checkpoint succeeds, indicating that the index
bootstrap completed.
+5. After the index bootstrap completed, user can exit and save the savepoint
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` as `false`.
+
+:::note
+1. Index bootstrap is blocking, so checkpoint cannot be completed during index
bootstrap.
+2. Index bootstrap triggers by the input data. User need to ensure that there
is at least one record in each partition.
+3. Index bootstrap executes concurrently. User can search in log by `finish
loading the index under partition` and `Load record form file` to observe the
progress of index bootstrap.
+4. The first successful checkpoint indicates that the index bootstrap
completed. There is no need to load the index again when recovering from the
checkpoint.
+ :::
+
+### Changelog Mode
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consumes through stateful computing of flink to have a near-real-time
+data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores
messages in the forms of rows, which supports the retention of all change logs
(Integration at the format level).
+All changelog records can be consumed with Flink streaming reader.
+
+#### Options
+
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
+
+:::note
+Batch (Snapshot) read still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
+:::
+
+:::note
+After setting `changelog.enable` as `true`, the retention of changelog records
are only best effort: the asynchronous compaction task will merge the changelog
records into one record, so if the
+stream source does not consume timely, only the merged record for each key can
be read after compaction. The solution is to reserve some buffer time for the
reader by adjusting the compaction strategy, such as
+the compaction options: [`compaction.delta_commits`](#compaction) and
[`compaction.delta_seconds`](#compaction).
+:::
+
+
+### Append Mode
+
+If INSERT operation is used for ingestion, for COW table, there is no merging
of small files by default; for MOR table, the small file strategy is applied
always: MOR appends delta records to log files.
+
+The small file strategy lead to performance degradation. If you want to apply
the behavior of file merge for COW table, turns on option
`write.insert.cluster`, there is no record key combining by the way.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
+
+### Rate Limit
+There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
+1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. At this time, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+
+### Incremental Query
+There are 3 use cases for incremental query:
+1. Streaming query: specify the start commit with option `read.start-commit`;
+2. Batch query: specify the start commit with option `read.start-commit` and
end commit with option `read.end-commit`,
+ the interval is a closed one: both start commit and end commit are
inclusive;
+3. TimeTravel: consume as batch for an instant time, specify the
`read.end-commit` is enough because the start commit is latest by default.
+
+#### Options
+| Option Name | Required | Default | Remarks |
+| ----------- | ------- | ------- | ------- |
+| `write.start-commit` | `false` | the latest commit | Specify `earliest` to
consume from the start commit |
+| `write.end-commit` | `false` | the latest commit | -- |
+
+## Kafka Connect Sink
If you want to perform streaming ingestion into Hudi format similar to
HoodieDeltaStreamer, but you don't want to depend on Spark,
try out the new experimental release of Hudi Kafka Connect Sink. Read the
[ReadMe](https://github.com/apache/hudi/tree/master/hudi-kafka-connect)
for full documentation.
diff --git a/website/docs/query_engine_setup.md
b/website/docs/query_engine_setup.md
index 99887b5..d64a577 100644
--- a/website/docs/query_engine_setup.md
+++ b/website/docs/query_engine_setup.md
@@ -18,13 +18,30 @@ Both snapshot and read optimized queries are supported on
MERGE_ON_READ Hudi tab
instructions for PrestoDB would vary based on versions. Please check the below
table for query types supported and installation instructions
for different versions of PrestoDB.
-
| **PrestoDB Version** | **Installation description** | **Query types
supported** |
|----------------------|------------------------------|---------------------------|
| < 0.233 | Requires the `hudi-presto-bundle` jar to be placed
into `<presto_install>/plugin/hive-hadoop2/`, across the installation. |
Snapshot querying on COW tables. Read optimized querying on MOR tables. |
| >= 0.233 | No action needed. Hudi (0.5.1-incubating) is a
compile time dependency. | Snapshot querying on COW tables. Read optimized
querying on MOR tables. |
| >= 0.240 | No action needed. Hudi 0.5.3 version is a compile
time dependency. | Snapshot querying on both COW and MOR tables |
+### Presto Environment
+1. Configure Presto according to the [Presto configuration
document](https://prestodb.io/docs/current/installation/deployment.html).
+2. Configure hive catalog in `
/presto-server-0.2xxx/etc/catalog/hive.properties` as follows:
+
+```properties
+connector.name=hive-hadoop2
+hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083
+hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xml
+```
+
+### Query
+Beginning query by connecting hive metastore with presto client. The presto
client connection command is as follows:
+
+```bash
+# The presto client connection command
+./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema default
+```
+
## Trino
:::note
[Trino](https://trino.io/) (formerly PrestoSQL) was forked off of PrestoDB a
few years ago. Hudi supports 'Snapshot' queries for Copy-On-Write tables and
'Read Optimized' queries
diff --git a/website/docs/syncing_metastore.md
b/website/docs/syncing_metastore.md
index 90e0c40..dad767b 100644
--- a/website/docs/syncing_metastore.md
+++ b/website/docs/syncing_metastore.md
@@ -3,7 +3,7 @@ title: Syncing to Metastore
keywords: [hudi, hive, sync]
---
-## Syncing to Hive
+## Spark and DeltaStreamer
Writing data with [DataSource](/docs/writing_data) writer or
[HoodieDeltaStreamer](/docs/hoodie_deltastreamer) supports syncing of the
table's latest schema to Hive metastore, such that queries can pick up new
columns and partitions.
In case, it's preferable to run this from commandline or in an independent
jvm, Hudi provides a `HiveSyncTool`, which can be invoked as below,
@@ -21,3 +21,101 @@ cd hudi-hive
./run_sync_tool.sh
[hudi-hive]$ ./run_sync_tool.sh --help
```
+
+
+## Flink Setup
+
+### Install
+
+Now you can git clone Hudi master branch to test Flink hive sync. The first
step is to install Hudi to get `hudi-flink-bundle_2.11-0.x.jar`.
+`hudi-flink-bundle` module pom.xml sets the scope related to hive as
`provided` by default. If you want to use hive sync, you need to use the
+profile `flink-bundle-shade-hive` during packaging. Executing command below to
install:
+
+```bash
+# Maven install command
+mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2
+
+# For hive1, you need to use profile -Pflink-bundle-shade-hive1
+# For hive3, you need to use profile -Pflink-bundle-shade-hive3
+```
+
+:::note
+Hive1.x can only synchronize metadata to hive, but cannot use hive query now.
If you need to query, you can use spark to query hive table.
+:::
+
+:::note
+If using hive profile, you need to modify the hive version in the profile to
your hive cluster version (Only need to modify the hive version in this
profile).
+The location of this `pom.xml` is `packaging/hudi-flink-bundle/pom.xml`, and
the corresponding profile is at the bottom of this file.
+:::
+
+### Hive Environment
+
+1. Import `hudi-hadoop-mr-bundle` into hive. Creating `auxlib/` folder under
the root directory of hive, and moving
`hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` into `auxlib`.
+ `hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar` is at
`packaging/hudi-hadoop-mr-bundle/target`.
+
+2. When Flink sql client connects hive metastore remotely, `hive metastore`
and `hiveserver2` services need to be enabled, and the port number need to
+ be set correctly. Command to turn on the services:
+
+```bash
+# Enable hive metastore and hiveserver2
+nohup ./bin/hive --service metastore &
+nohup ./bin/hive --service hiveserver2 &
+
+# While modifying the jar package under auxlib, you need to restart the
service.
+```
+
+### Sync Template
+
+Flink hive sync now supports two hive sync mode, `hms` and `jdbc`. `hms` mode
only needs to configure metastore uris. For
+the `jdbc` mode, the JDBC attributes and metastore uris both need to be
configured. The options template is as below:
+
+```sql
+-- hms mode template
+CREATE TABLE t1(
+ uuid VARCHAR(20),
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = '${db_path}/t1',
+ 'table.type' = 'COPY_ON_WRITE', --If MERGE_ON_READ, hive query will not
have output until the parquet file is generated
+ 'hive_sync.enable' = 'true', -- Required. To enable hive synchronization
+ 'hive_sync.mode' = 'hms' -- Required. Setting hive sync mode to hms,
default jdbc
+ 'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port
need set on hive-site.xml
+);
+
+
+-- jdbc mode template
+CREATE TABLE t1(
+ uuid VARCHAR(20),
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = '${db_path}/t1',
+ 'table.type' = 'COPY_ON_WRITE', --If MERGE_ON_READ, hive query will not
have output until the parquet file is generated
+ 'hive_sync.enable' = 'true', -- Required. To enable hive synchronization
+ 'hive_sync.mode' = 'hms' -- Required. Setting hive sync mode to hms,
default jdbc
+ 'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- Required. The port
need set on hive-site.xml
+ 'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000', -- required, hiveServer
port
+ 'hive_sync.table'='t1', -- required, hive table name
+ 'hive_sync.db'='testDB', -- required, hive database
name
+ 'hive_sync.username'='${user_name}', -- required, HMS
username
+ 'hive_sync.password'='${password}' -- required, HMS password
+);
+```
+
+### Query
+
+While using hive beeline query, you need to enter settings:
+```bash
+set hive.input.format =
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
+```
\ No newline at end of file
diff --git a/website/sidebars.js b/website/sidebars.js
index adc3cfb..a1dc301 100644
--- a/website/sidebars.js
+++ b/website/sidebars.js
@@ -41,6 +41,7 @@ module.exports = {
'writing_data',
'hoodie_deltastreamer',
'querying_data',
+ 'flink_configuration',
'syncing_metastore',
],
},