This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 735bfb16597 [DOCS] Update compaction page (#9428)
735bfb16597 is described below
commit 735bfb165972508e2f40d98c3d94010ec3319913
Author: Bhavani Sudha Saktheeswaran <[email protected]>
AuthorDate: Fri Aug 11 10:17:25 2023 -0700
[DOCS] Update compaction page (#9428)
Summary:
- Add high level context
- Add inline configs with compaction strategies explained
- Restructure flow
---
website/docs/compaction.md | 193 +++++++++++++++------
website/src/theme/DocPage/index.js | 2 +-
.../static/assets/images/hudi_mor_file_layout.jpg | Bin 0 -> 331563 bytes
.../hudi_mor_file_layout_post_compaction.jpg | Bin 0 -> 385402 bytes
4 files changed, 144 insertions(+), 51 deletions(-)
diff --git a/website/docs/compaction.md b/website/docs/compaction.md
index 9f7b119db43..49126fd2108 100644
--- a/website/docs/compaction.md
+++ b/website/docs/compaction.md
@@ -2,25 +2,129 @@
title: Compaction
summary: "In this page, we describe async compaction in Hudi."
toc: true
+toc_min_heading_level: 2
+toc_max_heading_level: 4
last_modified_at:
---
+## Background
+Compaction is a table service employed by Hudi specifically in Merge On
Read(MOR) tables to merge updates from row-based log
+files to the corresponding columnar-based base file periodically to produce a
new version of the base file. Compaction is
+not applicable to Copy On Write(COW) tables and only applies to MOR tables.
+
+### Why MOR tables need compaction?
+To understand the significance of compaction in MOR tables, it is helpful to
understand the MOR table layout first. In Hudi,
+data is organized in terms of [file
groups](https://hudi.apache.org/docs/file_layouts/). Each file group in a MOR
table
+consists of a base file and one or more log files. Typically, during writes,
inserts are stored in the base file, and updates
+are appended to log files.
+
+
+_Figure: MOR table file layout showing different file groups with base data
file and log files_
+
+During the compaction process, updates from the log files are merged with the
base file to form a new version of the
+base file as shown below. Since MOR is designed to be write-optimized, on new
writes, after index tagging is complete,
+Hudi appends the records pertaining to each file groups as log blocks in log
files. There is no synchronous merge
+happening during write, resulting in a lower write amplification and better
write latency. In contrast, on new writes to a
+COW table, Hudi combines the new writes with the older base file to produce a
new version of the base file resulting in
+a higher write amplification and higher write latencies.
+
+
+_Figure: Compaction on a given file group_
+
+While serving the read query(snapshot read), for each file group, records in
base file and all its corresponding log
+files are merged together and served. And hence the read latency for MOR
snapshot query might be higher compared to
+COW table since there is no merge involved in case of COW at read time.
Compaction takes care of merging the updates from
+log files with the base file at regular intervals to bound the growth of log
files and to ensure the read latencies do not
+spike up.
+
+## Compaction Architecture
+There are two steps to compaction.
+- ***Compaction Scheduling***: In this step, Hudi scans the partitions and
selects file slices to be compacted. A compaction
+ plan is finally written to Hudi timeline.
+- ***Compaction Execution***: In this step the compaction plan is read and
file slices are compacted.
+
+### Strategies in Compaction Scheduling
+There are two strategies involved in scheduling the compaction:
+- Trigger Strategy: Determines how often to trigger scheduling of the
compaction.
+- Compaction Strategy: Determines which file groups to compact.
+
+Hudi provides various options for both these strategies as discussed below.
+
+#### Trigger Strategies
+
+| Config Name | Default
| Description
|
+|----------------------------------------------------|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.compact.inline.trigger.strategy | NUM_COMMITS (Optional)
| org.apache.hudi.table.action.compact.CompactionTriggerStrategy: Controls when
compaction is scheduled.<br />`Config Param: INLINE_COMPACT_TRIGGER_STRATEGY` |
+Possible values: <br/><ul><li>`NUM_COMMITS`: triggers compaction when there
are at least N delta commits after last
+completed compaction.</li><li>`NUM_COMMITS_AFTER_LAST_REQUEST`: triggers
compaction when there are at least N delta commits
+after last completed or requested compaction.</li><li>`TIME_ELAPSED`: triggers
compaction after N seconds since last
+compaction.</li><li>`NUM_AND_TIME`: triggers compaction when both there are at
least N delta commits and N seconds
+elapsed (both must be satisfied) after last completed
compaction.</li><li>`NUM_OR_TIME`: triggers compaction when both
+there are at least N delta commits or N seconds elapsed (either condition is
satisfied) after last completed compaction.</li></ul>
+
+#### Compaction Strategies
+| Config Name | Default
| Description
[...]
+|----------------------------------------------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| hoodie.compaction.strategy |
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
(Optional) | Compaction strategy decides which file groups are picked up for
compaction during each compaction run. By default. Hudi picks the log file with
most accumulated unmerged data. <br /><br />`Config Param: COMPACTION_STRATEGY`
|
+
+Available Strategies (Provide the full package name when using the strategy):
<ul><li>`LogFileNumBasedCompactionStrategy`:
+orders the compactions based on the total log files count, filters the file
group with log files count greater than the
+threshold and limits the compactions within a configured IO
bound.</li><li>`LogFileSizeBasedCompactionStrategy`: orders
+the compactions based on the total log files size, filters the file group
which log files size is greater than the
+threshold and limits the compactions within a configured IO
bound.</li><li>`BoundedIOCompactionStrategy`: CompactionStrategy
+which looks at total IO to be done for the compaction (read + write) and
limits the list of compactions to be under a
+configured limit on the
IO.</li><li>`BoundedPartitionAwareCompactionStrategy`:This strategy ensures
that the last N partitions
+are picked up even if there are later partitions created for the table.
lastNPartitions is defined as the N partitions before
+the currentDate. currentDay = 2018/01/01 The table has partitions for
2018/02/02 and 2018/03/03 beyond the currentDay This
+strategy will pick up the following partitions for compaction : (2018/01/01,
allPartitionsInRange[(2018/01/01 - lastNPartitions)
+to 2018/01/01), 2018/02/02,
2018/03/03)</li><li>`DayBasedCompactionStrategy`:This strategy orders
compactions in reverse
+order of creation of Hive Partitions. It helps to compact data in latest
partitions first and then older capped at the
+Total_IO allowed.</li><li>`UnBoundedCompactionStrategy`:
UnBoundedCompactionStrategy will not change ordering or filter
+any compaction. It is a pass-through and will compact all the base files which
has a log file. This usually means
+no-intelligence on
compaction.</li><li>`UnBoundedPartitionAwareCompactionStrategy`:UnBoundedPartitionAwareCompactionStrategy
is a custom UnBounded Strategy. This will filter all the partitions that
+are eligible to be compacted by a {@link
BoundedPartitionAwareCompactionStrategy} and return the result. This is done
+so that a long running UnBoundedPartitionAwareCompactionStrategy does not step
over partitions in a shorter running
+BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of
the partitions chosen in
+BoundedPartitionAwareCompactionStrategy</li></ul>
-## Async Compaction
-Compaction is executed asynchronously with Hudi by default. Async Compaction
is performed in 2 steps:
+:::note
+Please refer to [advanced
configs](https://hudi.apache.org/docs/next/configurations#Compaction-Configs)
for more details.
+:::
-1. ***Compaction Scheduling***: This is done by the ingestion job. In this
step, Hudi scans the partitions and selects **file
- slices** to be compacted. A compaction plan is finally written to Hudi
timeline.
-1. ***Compaction Execution***: In this step the compaction plan is read and
file slices are compacted.
+## Ways to trigger Compaction
-There are few ways by which we can execute compactions asynchronously.
+### Inline
+By default, compaction is run asynchronously.
-### Spark Structured Streaming
+If latency of ingesting records is important for you, you are most likely
using Merge-On-Read tables.
+Merge-On-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
+Updates are logged to delta files & later compacted to produce new versions of
columnar files.
+To improve ingestion latency, Async Compaction is the default configuration.
-Compactions are scheduled and executed asynchronously inside the
-streaming job. Async Compactions are enabled by default for structured
streaming jobs
-on Merge-On-Read table.
+If immediate read performance of a new commit is important for you, or you
want simplicity of not managing separate compaction jobs,
+you may want synchronous inline compaction, which means that as a commit is
written it is also compacted by the same job.
+
+For this deployment mode, please use `hoodie.compact.inline = true` for Spark
Datasource and Spark SQL writers. For
+HoodieStreamer sync once mode inline compaction can be achieved by passing the
flag `--disable-compaction` (Meaning to
+disable async compaction). Further in HoodieStreamer when both
+ingestion and compaction is running in the same spark context, you can use
resource allocation configuration
+in Hudi Streamer CLI such as (`--delta-sync-scheduling-weight`,
+`--compact-scheduling-weight`, `--delta-sync-scheduling-minshare`, and
`--compact-scheduling-minshare`)
+to control executor allocation between ingestion and compaction.
+
+
+### Async & Offline Compaction models
+
+There are a couple of ways here to trigger compaction .
+
+#### Async execution within the same process
+In streaming ingestion write models like HoodieStreamer
+continuous mode, Flink and Spark Streaming, async compaction is enabled by
default and runs alongside without blocking
+regular ingestion.
-Here is an example snippet in java
+##### Spark Structured Streaming
+
+Compactions are scheduled and executed asynchronously inside the
+streaming job.Here is an example snippet in java
```properties
import org.apache.hudi.DataSourceWriteOptions;
@@ -45,7 +149,7 @@ import org.apache.spark.sql.streaming.ProcessingTime;
writer.trigger(new ProcessingTime(30000)).start(tablePath);
```
-### Hudi Streamer Continuous Mode
+##### Hudi Streamer Continuous Mode
Hudi Streamer provides continuous ingestion mode where a single long running
spark application
ingests data to Hudi table continuously from upstream sources. In this mode,
Hudi supports managing asynchronous
compactions. Here is an example snippet for running in continuous mode with
async compactions
@@ -58,44 +162,33 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
--target-table <hudi_table> \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--source-ordering-field ts \
---schemaprovider-class
org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--props /path/to/source.properties \
--continous
```
-## Synchronous Compaction
-By default, compaction is run asynchronously.
-
-If latency of ingesting records is important for you, you are most likely
using Merge-On-Read tables.
-Merge-On-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
-Updates are logged to delta files & later compacted to produce new versions of
columnar files.
-To improve ingestion latency, Async Compaction is the default configuration.
-
-If immediate read performance of a new commit is important for you, or you
want simplicity of not managing separate compaction jobs,
-you may want Synchronous compaction, which means that as a commit is written
it is also compacted by the same job.
-
-Compaction is run synchronously by passing the flag "--disable-compaction"
(Meaning to disable async compaction scheduling).
-When both ingestion and compaction is running in the same spark context, you
can use resource allocation configuration
-in Hudi Streamer CLI such as ("--delta-sync-scheduling-weight",
-"--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and
"--compact-scheduling-minshare")
-to control executor allocation between ingestion and compaction.
+#### Scheduling and Execution by a separate process
+For some use cases with long running table services, instead of having the
regular writes block, users have the option to run
+both steps of the compaction ([scheduling and
execution](#compaction-architecture)) offline in a separate process altogether.
+This allows for regular writers to not bother about these compaction steps and
allows users to provide more resources for
+the compaction job as needed.
+:::note
+This model needs a lock provider configured for all jobs - the regular writer
as well as the offline compaction job.
+:::
-## Offline Compaction
+#### Scheduling inline and executing async
-The compaction of the MERGE_ON_READ table is enabled by default. The trigger
strategy is to perform compaction after completing
-five commits. Because compaction consumes a lot of memory and is placed in the
same pipeline with the write operation, it's easy to
-interfere with the write operation when there is a large amount of data (>
100000 per second). As this time, it is more stable to execute
-the compaction task by using offline compaction.
+In this model, it is possible for a Spark Datasource writer or a Flink job to
just schedule the compaction inline ( that
+will serialize the compaction plan in the timeline but will not execute it).
And then a separate utility like
+HudiCompactor or HoodieFlinkCompactor can take care of periodically executing
the compaction plan.
:::note
-The execution of a compaction task includes two parts: schedule compaction
plan and execute compaction plan. It's recommended that
-the process of schedule compaction plan be triggered periodically by the write
task, and the write parameter `compaction.schedule.enable`
-is enabled by default.
+This model may need a lock provider **if** metadata table is enabled.
:::
-### Hudi Compactor Utility
-Hudi provides a standalone tool to execute specific compactions
asynchronously. Below is an example and you can read more in the [deployment
guide](/docs/deployment#compactions)
+#### Hudi Compactor Utility
+Hudi provides a standalone tool to execute specific compactions
asynchronously. Below is an example and you can read more in the [deployment
guide](/docs/cli#compactions)
+The compactor utility allows to do scheduling and execution of compaction.
Example:
```properties
@@ -108,9 +201,9 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-bundle_2.11:0.6.0 \
```
Note, the `instant-time` parameter is now optional for the Hudi Compactor
Utility. If using the utility without `--instant time`,
-the spark-submit will execute the earliest scheduled compaction on the Hudi
timeline.
+the spark-submit will execute the earliest scheduled compaction on the Hudi
timeline.
-### Hudi CLI
+#### Hudi CLI
Hudi CLI is yet another way to execute specific compactions asynchronously.
Here is an example and you can read more in the [deployment
guide](/docs/cli#compactions)
Example:
@@ -119,7 +212,7 @@ hudi:trips->compaction run --tableName <table_name>
--parallelism <parallelism>
...
```
-### Flink Offline Compaction
+#### Flink Offline Compaction
Offline compaction needs to submit the Flink task on the command line. The
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
@@ -130,11 +223,11 @@ Offline compaction needs to submit the Flink task on the
command line. The progr
#### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `--path` | `true` | `--` | The path where the target table is stored on Hudi
|
-| `--compaction-max-memory` | `false` | `100` | The index map size of log data
during compaction, 100 MB by default. If you have enough memory, you can turn
up this parameter |
-| `--schedule` | `false` | `false` | whether to execute the operation of
scheduling compaction plan. When the write process is still writing, turning on
this parameter have a risk of losing data. Therefore, it must be ensured that
there are no write tasks currently writing data to this table when this
parameter is turned on |
-| `--seq` | `false` | `LIFO` | The order in which compaction tasks are
executed. Executing from the latest compaction plan by default. `LIFO`:
executing from the latest plan. `FIFO`: executing from the oldest plan. |
-| `--service` | `false` | `false` | Whether to start a monitoring service that
checks and schedules new compaction task in configured interval. |
-| `--min-compaction-interval-seconds` | `false` | `600(s)` | The checking
interval for service mode, by default 10 minutes. |
+| Option Name | Default | Description |
+| -----------
|-------------------------------------------------------------------------------------------------------------------------------|
------- |
+| `--path` | `n/a **(Required)**` | The path where the target table is stored
on Hudi
|
+| `--compaction-max-memory` | `100` (Optional) | The index map size of log
data during compaction, 100 MB by default. If you have enough memory, you can
turn up this parameter |
+| `--schedule` | `false` (Optional) | whether to execute the operation of
scheduling compaction plan. When the write process is still writing, turning on
this parameter have a risk of losing data. Therefore, it must be ensured that
there are no write tasks currently writing data to this table when this
parameter is turned on |
+| `--seq` | `LIFO` (Optional) | The order in which compaction tasks are
executed. Executing from the latest compaction plan by default. `LIFO`:
executing from the latest plan. `FIFO`: executing from the oldest plan. |
+| `--service` | `false` (Optional) | Whether to start a monitoring service
that checks and schedules new compaction task in configured interval. |
+| `--min-compaction-interval-seconds` | `600(s)` (optional) | The checking
interval for service mode, by default 10 minutes. |
diff --git a/website/src/theme/DocPage/index.js
b/website/src/theme/DocPage/index.js
index 3e4e22077c4..24b764ea044 100644
--- a/website/src/theme/DocPage/index.js
+++ b/website/src/theme/DocPage/index.js
@@ -128,7 +128,7 @@ function DocPageContent({
);
}
-const arrayOfPages = (matchPath) => [`${matchPath}/configurations`,
`${matchPath}/basic_configurations`, `${matchPath}/timeline`,
`${matchPath}/table_types`, `${matchPath}/migration_guide`];
+const arrayOfPages = (matchPath) => [`${matchPath}/configurations`,
`${matchPath}/basic_configurations`, `${matchPath}/timeline`,
`${matchPath}/table_types`, `${matchPath}/migration_guide`,
`${matchPath}/compaction`];
const showCustomStylesForDocs = (matchPath, pathname) =>
arrayOfPages(matchPath).includes(pathname);
function DocPage(props) {
const {
diff --git a/website/static/assets/images/hudi_mor_file_layout.jpg
b/website/static/assets/images/hudi_mor_file_layout.jpg
new file mode 100644
index 00000000000..e3bb5505d74
Binary files /dev/null and
b/website/static/assets/images/hudi_mor_file_layout.jpg differ
diff --git
a/website/static/assets/images/hudi_mor_file_layout_post_compaction.jpg
b/website/static/assets/images/hudi_mor_file_layout_post_compaction.jpg
new file mode 100644
index 00000000000..75fbe59a5c3
Binary files /dev/null and
b/website/static/assets/images/hudi_mor_file_layout_post_compaction.jpg differ