This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 5f8874e03dbd docs: update clustering and compaction pages (#14328)
5f8874e03dbd is described below
commit 5f8874e03dbdbc545f974b80c86443301ec68cb9
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Nov 23 21:00:36 2025 -0600
docs: update clustering and compaction pages (#14328)
---
website/docs/clustering.md | 94 +++++++++++++-----------
website/docs/compaction.md | 149 ++++++++++++++++++++++----------------
website/releases/release-1.1.0.md | 4 +-
3 files changed, 140 insertions(+), 107 deletions(-)
diff --git a/website/docs/clustering.md b/website/docs/clustering.md
index 44f3fe9d3805..e1894e19c239 100644
--- a/website/docs/clustering.md
+++ b/website/docs/clustering.md
@@ -2,7 +2,7 @@
title: Clustering
summary: "In this page, we describe async compaction in Hudi."
toc: true
-last_modified_at:
+last_modified_at: 2025-11-24T02:44:48
---
## Background
@@ -13,7 +13,7 @@ Apache Hudi brings stream processing to big data, providing
fresh data while bei
## How is compaction different from clustering?
Hudi is modeled like a log-structured storage engine with multiple versions of
the data.
-Particularly, [Merge-On-Read](table_types#merge-on-read-table)
+Particularly, [Merge-on-Read](table_types#merge-on-read-table)
tables in Hudi store data using a combination of base file in columnar format
and row-based delta logs that contain
updates. Compaction is a way to merge the delta logs with base files to
produce the latest file slices with the most
recent snapshot of data. Compaction helps to keep the query performance in
check (larger delta log files would incur
@@ -25,35 +25,41 @@ can take advantage of data locality.
At a high level, Hudi provides different operations such as
insert/upsert/bulk_insert through it’s write client API to be able to write
data to a Hudi table. To be able to choose a trade-off between file size and
ingestion speed, Hudi provides a knob `hoodie.parquet.small.file.limit` to be
able to configure the smallest allowable file size. Users are able to configure
the small file [soft
limit](https://hudi.apache.org/docs/configurations/#hoodieparquetsmallfilelimit)
to `0` to force new [...]
-
-
To be able to support an architecture that allows for fast ingestion without
compromising query performance, we have introduced a ‘clustering’ service to
rewrite the data to optimize Hudi data lake file layout.
Clustering table service can run asynchronously or synchronously adding a new
action type called “REPLACE”, that will mark the clustering action in the Hudi
metadata timeline.
-
-
### Overall, there are 2 steps to clustering
-1. Scheduling clustering: Create a clustering plan using a pluggable
clustering strategy.
-2. Execute clustering: Process the plan using an execution strategy to create
new files and replace old files.
-
+1. Scheduling clustering: Create a clustering plan using a pluggable
clustering strategy.
+2. Execute clustering: Process the plan using an execution strategy to create
new files and replace old files.
### Schedule clustering
Following steps are followed to schedule clustering.
-1. Identify files that are eligible for clustering: Depending on the
clustering strategy chosen, the scheduling logic will identify the files
eligible for clustering.
-2. Group files that are eligible for clustering based on specific criteria.
Each group is expected to have data size in multiples of ‘targetFileSize’.
Grouping is done as part of ‘strategy’ defined in the plan. Additionally, there
is an option to put a cap on group size to improve parallelism and avoid
shuffling large amounts of data.
-3. Finally, the clustering plan is saved to the timeline in an avro [metadata
format](https://github.com/apache/hudi/blob/master/hudi-common/src/main/avro/HoodieClusteringPlan.avsc).
+1. Identify files that are eligible for clustering: Depending on the
clustering strategy chosen, the scheduling logic will identify the files
eligible for clustering.
+2. Group files that are eligible for clustering based on specific criteria.
Each group is expected to have data size in multiples of 'targetFileSize'.
Grouping is done as part of 'strategy' defined in the plan. Additionally, there
is an option to put a cap on group size to improve parallelism and avoid
shuffling large amounts of data.
+3. Finally, the clustering plan is saved to the timeline in an avro [metadata
format](https://github.com/apache/hudi/blob/master/hudi-common/src/main/avro/HoodieClusteringPlan.avsc).
+### Incremental Scheduling
-### Execute clustering
+Hudi supports incremental scheduling for clustering operations, which
significantly improves performance on tables with a large number of partitions.
Instead of scanning all partitions during each clustering scheduling run,
incremental scheduling only processes partitions that have changed since the
last completed clustering operation.
-1. Read the clustering plan and get the ‘clusteringGroups’ that mark the file
groups that need to be clustered.
-2. For each group, we instantiate appropriate strategy class with
strategyParams (example: sortColumns) and apply that strategy to rewrite the
data.
-3. Create a “REPLACE” commit and update the metadata in
[HoodieReplaceCommitMetadata](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java).
+This feature is enabled by default via
`hoodie.table.services.incremental.enabled`. When enabled, clustering
scheduling will:
+1. Identify partitions that have been modified since the last completed
clustering operation by analyzing commit metadata within the time window
between the last completed clustering and the current scheduling instant
+2. Include any partitions that were marked as missing from previous scheduling
runs (e.g., due to IO limits or group size restrictions)
+3. Only scan and process those incremental partitions
+4. Fall back to scanning all partitions if the last completed clustering
instant cannot be found (e.g., due to archival) or if an exception occurs
during incremental partition retrieval
+
+For tables with many partitions, this optimization can dramatically reduce
scheduling overhead and improve overall job stability.
+
+### Execute clustering
+
+1. Read the clustering plan and get the ‘clusteringGroups’ that mark the file
groups that need to be clustered.
+2. For each group, we instantiate appropriate strategy class with
strategyParams (example: sortColumns) and apply that strategy to rewrite the
data.
+3. Create a “REPLACE” commit and update the metadata in
[HoodieReplaceCommitMetadata](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java).
Clustering Service builds on Hudi’s MVCC based design to allow for writers to
continue to insert new data while clustering action runs in the background to
reformat data layout, ensuring snapshot isolation between concurrent readers
and writers.
@@ -108,12 +114,11 @@ clustering, tune the file size limits, maximum number of
output groups. Please r
,
[hoodie.clustering.plan.strategy.target.file.max.bytes](https://hudi.apache.org/docs/next/configurations/#hoodieclusteringplanstrategytargetfilemaxbytes)
for more details.
| Config Name | Default
| Description
[...]
-|---------------------------------------------------------|
-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+|---------------------------------------------------------|--------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| hoodie.clustering.plan.strategy.partition.selected | N/A **(Required)**
| Comma separated list of partitions to run clustering<br /><br />`Config
Param: PARTITION_SELECTED`<br />`Since Version: 0.11.0`
[...]
| hoodie.clustering.plan.strategy.partition.regex.pattern | N/A **(Required)**
| Filter clustering partitions that matched regex pattern<br /><br />`Config
Param: PARTITION_REGEX_PATTERN`<br />`Since Version: 0.11.0`
[...]
| hoodie.clustering.plan.partition.filter.mode | NONE (Optional)
| Partition filter mode used in the creation of clustering plan. Possible
values:<br /><ul><li>`NONE`: Do not filter partitions. The clustering plan will
include all partitions that have clustering candidates.</li><li>`RECENT_DAYS`:
This filter assumes that your data is partitioned by date. The clustering plan
will only include partitions from K days ago to N days ago, where K >= N. K
is determined by `hood [...]
-
#### SparkSingleFileSortPlanStrategy
In this strategy, clustering group for each partition is built in the same way
as `SparkSizeBasedClusteringPlanStrategy`
@@ -137,9 +142,9 @@ config
[hoodie.clustering.execution.strategy.class](configurations/#hoodiecluste
default, Hudi sorts the file groups in the plan by the specified columns,
while meeting the configured target file
sizes.
-| Config Name | Default
| Description
[...]
-| --------------------------------------------|
-----------------------------------------------------------------------------------------------
|
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| hoodie.clustering.execution.strategy.class |
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
(Optional) | Config to provide a strategy class (subclass of
RunClusteringStrategy) to define how the clustering plan is executed. By
default, we sort the file groups in th plan by the specified columns, while
meeting the configured target file sizes.<br /><br />`Config Param:
EXECUTION_STRATEGY_CLASS_NAME`<br />`Since Version: 0.7.0`
[...]
+| Config Name | Default
| Description
|
+|--------------------------------------------|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.clustering.execution.strategy.class |
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
(Optional) | Config to provide a strategy class (subclass of
RunClusteringStrategy) to define how the clustering plan is executed. By
default, we sort the file groups in th plan by the specified columns, while
meeting the configured target file sizes.<br /><br />`Config Param:
EXECUTION_STRATEGY_CLASS_NAME`<br />`Since Version: 0.7.0` |
The available strategies are as follows:
@@ -159,21 +164,21 @@ The available strategies are as follows:
### Update Strategy
Currently, clustering can only be scheduled for tables/partitions not
receiving any concurrent updates. By default,
-the config for update strategy -
[`hoodie.clustering.updates.strategy`](configurations/#hoodieclusteringupdatesstrategy)
is set to ***SparkRejectUpdateStrategy***. If some file group has updates
during clustering then it will reject updates and throw an
+the config for update strategy -
[`hoodie.clustering.updates.strategy`](configurations/#hoodieclusteringupdatesstrategy)
is set to **SparkRejectUpdateStrategy**. If some file group has updates during
clustering then it will reject updates and throw an
exception. However, in some use-cases updates are very sparse and do not touch
most file groups. The default strategy to
-simply reject updates does not seem fair. In such use-cases, users can set the
config to ***SparkAllowUpdateStrategy***.
+simply reject updates does not seem fair. In such use-cases, users can set the
config to **SparkAllowUpdateStrategy**.
We discussed the critical strategy configurations. All other configurations
related to clustering are
-listed [here](configurations/#Clustering-Configs). Out of this list, a few
configurations that will be very useful
+listed [clustering configurations](configurations/#Clustering-Configs). Out of
this list, a few configurations that will be very useful
for inline or async clustering are shown below with code samples.
## Inline clustering
Inline clustering happens synchronously with the regular ingestion writer or
as part of the data ingestion pipeline. This means the next round of ingestion
cannot proceed until the clustering is complete With inline clustering, Hudi
will schedule, plan clustering operations after each commit is completed and
execute the clustering plans after it’s created. This is the simplest
deployment model to run because it’s easier to manage than running different
asynchronous Spark jobs. This mode [...]
-For this deployment mode, please enable and set: `hoodie.clustering.inline`
+For this deployment mode, please enable and set: `hoodie.clustering.inline`
-To choose how often clustering is triggered, also set:
`hoodie.clustering.inline.max.commits`.
+To choose how often clustering is triggered, also set:
`hoodie.clustering.inline.max.commits`.
Inline clustering can be setup easily using spark dataframe options.
See sample below:
@@ -185,8 +190,6 @@ import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
-
-
val df = //generate data frame
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
@@ -206,27 +209,29 @@ df.write.format("org.apache.hudi").
## Async Clustering
-Async clustering runs the clustering table service in the background without
blocking the regular ingestions writers. There are three different ways to
deploy an asynchronous clustering process:
+Async clustering runs the clustering table service in the background without
blocking the regular ingestions writers. There are three different ways to
deploy an asynchronous clustering process:
-- **Asynchronous execution within the same process**: In this deployment mode,
Hudi will schedule and plan the clustering operations after each commit is
completed as part of the ingestion pipeline. Separately, Hudi spins up another
thread within the same job and executes the clustering table service. This is
supported by Spark Streaming, Flink and Hudi Streamer in continuous mode. For
this deployment mode, please enable `hoodie.clustering.async.enabled` and
`hoodie.clustering.async.max. [...]
-- **Asynchronous scheduling and execution by a separate process**: In this
deployment mode, the application will write data to a Hudi table as part of the
ingestion pipeline. A separate clustering job will schedule, plan and execute
the clustering operation. By running a different job for the clustering
operation, it rebalances how Hudi uses compute resources: fewer compute
resources are needed for the ingestion, which makes ingestion latency stable,
and an independent set of compute res [...]
+- **Asynchronous execution within the same process**: In this deployment mode,
Hudi will schedule and plan the clustering operations after each commit is
completed as part of the ingestion pipeline. Separately, Hudi spins up another
thread within the same job and executes the clustering table service. This is
supported by Spark Streaming, Flink and Hudi Streamer in continuous mode. For
this deployment mode, please enable `hoodie.clustering.async.enabled` and
`hoodie.clustering.async.max. [...]
+- **Asynchronous scheduling and execution by a separate process**: In this
deployment mode, the application will write data to a Hudi table as part of the
ingestion pipeline. A separate clustering job will schedule, plan and execute
the clustering operation. By running a different job for the clustering
operation, it rebalances how Hudi uses compute resources: fewer compute
resources are needed for the ingestion, which makes ingestion latency stable,
and an independent set of compute res [...]
- **Scheduling inline and executing async**: In this deployment mode, the
application ingests data and schedules the clustering in one job; in another,
the application executes the clustering plan. The supported writers (see below)
won’t be blocked from ingesting data. If the metadata table is enabled, a lock
provider is not needed. However, if the metadata table is enabled, please
ensure all jobs have the lock providers configured for concurrency control. All
writers support this deploy [...]
Hudi supports
[multi-writers](https://hudi.apache.org/docs/concurrency_control#enabling-multi-writing)
which provides
snapshot isolation between multiple table services, thus allowing writers to
continue with ingestion while clustering
runs in the background.
-| Config Name
| Default | Description
[...]
-|
---------------------------------------------------------------------------------------------------
| --------------------------------------- |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| hoodie.clustering.async.enabled |
false (Optional) | Enable running of clustering service,
asynchronously as inserts happen on the table.<br /><br />`Config Param:
ASYNC_CLUSTERING_ENABLE`<br />`Since Version: 0.7.0`
[...]
-| hoodie.clustering.async.max.commits
| 4 (Optional)
| Config to control frequency of async
clustering<br /><br />`Config Param: ASYNC_CLUSTERING_MAX_COMMITS`<br />`Since
Version: 0.9.0`
[...]
+| Config Name | Default | Description
|
+|-------------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.clustering.async.enabled | false (Optional) | Enable running of
clustering service, asynchronously as inserts happen on the table.<br /><br
/>`Config Param: ASYNC_CLUSTERING_ENABLE`<br />`Since Version: 0.7.0` |
+| hoodie.clustering.async.max.commits | 4 (Optional) | Config to control
frequency of async clustering<br /><br />`Config Param:
ASYNC_CLUSTERING_MAX_COMMITS`<br />`Since Version: 0.9.0`
|
+
+### Setup Asynchronous Clustering
-## Setup Asynchronous Clustering
Users can leverage
[HoodieClusteringJob](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+freshness+and+query+performance#RFC19Clusteringdataforfreshnessandqueryperformance-SetupforAsyncclusteringJob)
to setup 2-step asynchronous clustering.
### HoodieClusteringJob
-By specifying the `scheduleAndExecute` mode both schedule as well as
clustering can be achieved in the same step.
+
+By specifying the `scheduleAndExecute` mode both schedule as well as
clustering can be achieved in the same step.
The appropriate mode can be specified using `-mode` or `-m` option. There are
three modes:
1. `schedule`: Make a clustering plan. This gives an instant which can be
passed in execute mode.
@@ -234,7 +239,8 @@ The appropriate mode can be specified using `-mode` or `-m`
option. There are th
3. `scheduleAndExecute`: Make a clustering plan first and execute that plan
immediately.
Note that to run this job while the original writer is still running, please
enable multi-writing:
-```
+
+```properties
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
```
@@ -252,8 +258,10 @@ spark-submit \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g
```
+
A sample `clusteringjob.properties` file:
-```
+
+```properties
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
@@ -270,7 +278,6 @@ whose location can be pased as `—props` when starting the
Hudi Streamer (just
A sample spark-submit command to setup Hudi Streamer is as below:
-
```bash
spark-submit \
--jars
"packaging/hudi-utilities-slim-bundle/target/hudi-utilities-slim-bundle_2.12-1.0.2.jar,packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.2.jar"
\
@@ -291,6 +298,7 @@ spark-submit \
### Spark Structured Streaming
We can also enable asynchronous clustering with Spark structured streaming
sink as shown below.
+
```scala
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
@@ -301,8 +309,8 @@ val commonOpts = Map(
"hoodie.table.name" -> "hoodie_test"
)
-def getAsyncClusteringOpts(isAsyncClustering: String,
- clusteringNumCommit: String,
+def getAsyncClusteringOpts(isAsyncClustering: String,
+ clusteringNumCommit: String,
executionStrategy: String):Map[String, String] = {
commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key ->
isAsyncClustering,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
@@ -347,5 +355,3 @@ out-of-the-box. Note that as of now only linear sort is
supported in Java execut
[Hudi Z-Order and Hilbert Space-filling
Curves](https://medium.com/apache-hudi-blogs/hudi-z-order-and-hilbert-space-filling-curves-68fa28bffaf0)
<h3>Videos</h3>
-
-* [Understanding Clustering in Apache Hudi and the Benefits of Asynchronous
Clustering](https://www.youtube.com/watch?v=R_sm4wlGXuE)
diff --git a/website/docs/compaction.md b/website/docs/compaction.md
index 4da886ef42d4..7acec30db5fe 100644
--- a/website/docs/compaction.md
+++ b/website/docs/compaction.md
@@ -4,77 +4,96 @@ summary: "In this page, we describe async compaction in Hudi."
toc: true
toc_min_heading_level: 2
toc_max_heading_level: 4
-last_modified_at:
+last_modified_at: 2025-11-24T02:44:48
---
## Background
-Compaction is a table service employed by Hudi specifically in Merge On
Read(MOR) tables to merge updates from row-based log
-files to the corresponding columnar-based base file periodically to produce a
new version of the base file. Compaction is
-not applicable to Copy On Write(COW) tables and only applies to MOR tables.
+
+Compaction is a table service employed by Hudi specifically in Merge-on-Read
(MOR) tables to merge updates from row-based log
+files to the corresponding columnar-based base file periodically to produce a
new version of the base file. Compaction is
+not applicable to Copy-on-Write (COW) tables and only applies to MOR tables.
### Why MOR tables need compaction?
-To understand the significance of compaction in MOR tables, it is helpful to
understand the MOR table layout first. In Hudi,
-data is organized in terms of [file
groups](https://hudi.apache.org/docs/file_layouts/). Each file group in a MOR
table
-consists of a base file and one or more log files. Typically, during writes,
inserts are stored in the base file, and updates
+
+To understand the significance of compaction in MOR tables, it is helpful to
understand the MOR table layout first. In Hudi,
+data is organized in terms of [file
groups](https://hudi.apache.org/docs/file_layouts/). Each file group in a MOR
table
+consists of a base file and one or more log files. Typically, during writes,
inserts are stored in the base file, and updates
are appended to log files.

_Figure: MOR table file layout showing different file groups with base data
file and log files_
-During the compaction process, updates from the log files are merged with the
base file to form a new version of the
-base file as shown below. Since MOR is designed to be write-optimized, on new
writes, after index tagging is complete,
-Hudi appends the records pertaining to each file groups as log blocks in log
files. There is no synchronous merge
-happening during write, resulting in a lower write amplification and better
write latency. In contrast, on new writes to a
-COW table, Hudi combines the new writes with the older base file to produce a
new version of the base file resulting in
+During the compaction process, updates from the log files are merged with the
base file to form a new version of the
+base file as shown below. Since MOR is designed to be write-optimized, on new
writes, after index tagging is complete,
+Hudi appends the records pertaining to each file groups as log blocks in log
files. There is no synchronous merge
+happening during write, resulting in a lower write amplification and better
write latency. In contrast, on new writes to a
+COW table, Hudi combines the new writes with the older base file to produce a
new version of the base file resulting in
a higher write amplification and higher write latencies.

_Figure: Compaction on a given file group_
-While serving the read query(snapshot read), for each file group, records in
base file and all its corresponding log
-files are merged together and served. And hence the read latency for MOR
snapshot query might be higher compared to
+While serving the read query(snapshot read), for each file group, records in
base file and all its corresponding log
+files are merged together and served. And hence the read latency for MOR
snapshot query might be higher compared to
COW table since there is no merge involved in case of COW at read time.
Compaction takes care of merging the updates from
log files with the base file at regular intervals to bound the growth of log
files and to ensure the read latencies do not
spike up.
## Compaction Architecture
-There are two steps to compaction.
-- ***Compaction Scheduling***: In this step, Hudi scans the partitions and
selects file slices to be compacted. A compaction
- plan is finally written to Hudi timeline.
-- ***Compaction Execution***: In this step the compaction plan is read and
file slices are compacted.
+
+There are two steps to compaction.
+
+- **Compaction Scheduling**: In this step, Hudi scans the partitions and
selects file slices to be compacted. A compaction plan is finally written to
Hudi timeline.
+- **Compaction Execution**: In this step the compaction plan is read and file
slices are compacted.
### Strategies in Compaction Scheduling
+
There are two strategies involved in scheduling the compaction:
-- Trigger Strategy: Determines how often to trigger scheduling of the
compaction.
+
+- Trigger Strategy: Determines how often to trigger scheduling of the
compaction.
- Compaction Strategy: Determines which file groups to compact.
Hudi provides various options for both these strategies as discussed below.
+#### Incremental Scheduling
+
+Hudi supports incremental scheduling for compaction operations, which
significantly improves performance on tables with a large number of partitions.
Instead of scanning all partitions during each compaction scheduling run,
incremental scheduling only processes partitions that have changed since the
last completed compaction.
+
+This feature is enabled by default via
`hoodie.table.services.incremental.enabled`. When enabled, compaction
scheduling will:
+
+1. Identify partitions that have been modified since the last completed
compaction by analyzing commit metadata within the time window between the last
completed compaction and the current scheduling instant
+2. Include any partitions that were marked as missing from previous scheduling
runs (e.g., due to IO limits)
+3. Only scan and process those incremental partitions
+4. Fall back to scanning all partitions if the last completed compaction
instant cannot be found (e.g., due to archival) or if an exception occurs
during incremental partition retrieval
+
+For tables with many partitions, this optimization can dramatically reduce
scheduling overhead and improve overall job stability.
+
#### Trigger Strategies
-| Config Name | Default
| Description
|
+| Config Name | Default
| Description
||
|----------------------------------------------------|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hoodie.compact.inline.trigger.strategy | NUM_COMMITS (Optional)
| org.apache.hudi.table.action.compact.CompactionTriggerStrategy: Controls when
compaction is scheduled.<br />`Config Param: INLINE_COMPACT_TRIGGER_STRATEGY`
<br/>
-<ul><li>`NUM_COMMITS`: triggers compaction when there are at least N delta
commits after last completed
compaction.</li><li>`NUM_COMMITS_AFTER_LAST_REQUEST`: triggers compaction when
there are at least N delta commits after last completed or requested
compaction.</li><li>`TIME_ELAPSED`: triggers compaction after N seconds since
last compaction.</li><li>`NUM_AND_TIME`: triggers compaction when both there
are at least N delta commits and N seconds elapsed (both must be satisfied)
after las [...]
+<ul><li>`NUM_COMMITS`: triggers compaction when there are at least N delta
commits after last completed
compaction.</li><li>`NUM_COMMITS_AFTER_LAST_REQUEST`: triggers compaction when
there are at least N delta commits after last completed or requested
compaction.</li><li>`TIME_ELAPSED`: triggers compaction after N seconds since
last compaction.</li><li>`NUM_AND_TIME`: triggers compaction when both there
are at least N delta commits and N seconds elapsed (both must be satisfied)
after las [...]
#### Compaction Strategies
-| Config Name | Default
| Description
[...]
-|----------------------------------------------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| hoodie.compaction.strategy |
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
(Optional) | Compaction strategy decides which file groups are picked up for
compaction during each compaction run. By default. Hudi picks the log file with
most accumulated unmerged data. <br /><br />`Config Param: COMPACTION_STRATEGY`
|
-
-Available Strategies (Provide the full package name when using the strategy):
<ul><li>`LogFileNumBasedCompactionStrategy`:
-orders the compactions based on the total log files count, filters the file
group with log files count greater than the
-threshold and limits the compactions within a configured IO
bound.</li><li>`LogFileSizeBasedCompactionStrategy`: orders
-the compactions based on the total log files size, filters the file group
which log files size is greater than the
+
+| Config Name | Default
| Description
|
+|----------------------------|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.compaction.strategy |
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
(Optional) | Compaction strategy decides which file groups are picked up for
compaction during each compaction run. By default. Hudi picks the log file with
most accumulated unmerged data. <br /><br />`Config Param: COMPACTION_STRATEGY`
|
+
+Available Strategies (Provide the full package name when using the strategy):
<ul><li>`LogFileNumBasedCompactionStrategy`:
+orders the compactions based on the total log files count, filters the file
group with log files count greater than the
+threshold and limits the compactions within a configured IO
bound.</li><li>`LogFileSizeBasedCompactionStrategy`: orders
+the compactions based on the total log files size, filters the file group
which log files size is greater than the
threshold and limits the compactions within a configured IO
bound.</li><li>`BoundedIOCompactionStrategy`: CompactionStrategy
-which looks at total IO to be done for the compaction (read + write) and
limits the list of compactions to be under a
-configured limit on the
IO.</li><li>`BoundedPartitionAwareCompactionStrategy`:This strategy ensures
that the last N partitions
-are picked up even if there are later partitions created for the table.
lastNPartitions is defined as the N partitions before
-the currentDate. currentDay = 2018/01/01 The table has partitions for
2018/02/02 and 2018/03/03 beyond the currentDay This
-strategy will pick up the following partitions for compaction : (2018/01/01,
allPartitionsInRange[(2018/01/01 - lastNPartitions)
-to 2018/01/01), 2018/02/02,
2018/03/03)</li><li>`DayBasedCompactionStrategy`:This strategy orders
compactions in reverse
-order of creation of Hive Partitions. It helps to compact data in latest
partitions first and then older capped at the
-Total_IO allowed.</li><li>`UnBoundedCompactionStrategy`:
UnBoundedCompactionStrategy will not change ordering or filter
-any compaction. It is a pass-through and will compact all the base files which
has a log file. This usually means
+which looks at total IO to be done for the compaction (read + write) and
limits the list of compactions to be under a
+configured limit on the
IO.</li><li>`BoundedPartitionAwareCompactionStrategy`:This strategy ensures
that the last N partitions
+are picked up even if there are later partitions created for the table.
lastNPartitions is defined as the N partitions before
+the currentDate. currentDay = 2018/01/01 The table has partitions for
2018/02/02 and 2018/03/03 beyond the currentDay This
+strategy will pick up the following partitions for compaction : (2018/01/01,
allPartitionsInRange[(2018/01/01 - lastNPartitions)
+to 2018/01/01), 2018/02/02,
2018/03/03)</li><li>`DayBasedCompactionStrategy`:This strategy orders
compactions in reverse
+order of creation of Hive Partitions. It helps to compact data in latest
partitions first and then older capped at the
+Total_IO allowed.</li><li>`UnBoundedCompactionStrategy`:
UnBoundedCompactionStrategy will not change ordering or filter
+any compaction. It is a pass-through and will compact all the base files which
has a log file. This usually means
no-intelligence on
compaction.</li><li>`UnBoundedPartitionAwareCompactionStrategy`:UnBoundedPartitionAwareCompactionStrategy
is a custom UnBounded Strategy. This will filter all the partitions that
are eligible to be compacted by a \{@link
BoundedPartitionAwareCompactionStrategy} and return the result. This is done
so that a long running UnBoundedPartitionAwareCompactionStrategy does not step
over partitions in a shorter running
@@ -88,10 +107,11 @@ Please refer to [advanced
configs](https://hudi.apache.org/docs/next/configurati
## Ways to trigger Compaction
### Inline
+
By default, compaction is run asynchronously.
-If latency of ingesting records is important for you, you are most likely
using Merge-On-Read tables.
-Merge-On-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
+If latency of ingesting records is important for you, you are most likely
using Merge-on-Read tables.
+Merge-on-Read tables store data using a combination of columnar (e.g parquet)
+ row based (e.g avro) file formats.
Updates are logged to delta files & later compacted to produce new versions of
columnar files.
To improve ingestion latency, Async Compaction is the default configuration.
@@ -100,21 +120,21 @@ you may want synchronous inline compaction, which means
that as a commit is writ
For this deployment mode, please use `hoodie.compact.inline = true` for Spark
Datasource and Spark SQL writers. For
Hudi Streamer sync once mode inline compaction can be achieved by passing the
flag `--disable-compaction` (Meaning to
-disable async compaction). Further in Hudi Streamer when both
+disable async compaction). Further in Hudi Streamer when both
ingestion and compaction is running in the same spark context, you can use
resource allocation configuration
in Hudi Streamer CLI such as (`--delta-sync-scheduling-weight`,
`--compact-scheduling-weight`, `--delta-sync-scheduling-minshare`, and
`--compact-scheduling-minshare`)
to control executor allocation between ingestion and compaction.
-
### Async & Offline Compaction models
There are a couple of ways here to trigger compaction .
#### Async execution within the same process
-In streaming ingestion write models like Hudi Streamer
-continuous mode, Flink and Spark Streaming, async compaction is enabled by
default and runs alongside without blocking
-regular ingestion.
+
+In streaming ingestion write models like Hudi Streamer
+continuous mode, Flink and Spark Streaming, async compaction is enabled by
default and runs alongside without blocking
+regular ingestion.
##### Spark Structured Streaming
@@ -129,8 +149,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
-
-
DataStreamWriter<Row> writer =
streamingInput.writeStream().format("org.apache.hudi")
.option("hoodie.datasource.write.operation", operationType)
.option("hoodie.datasource.write.table.type", tableType)
@@ -145,7 +163,8 @@ import org.apache.spark.sql.streaming.ProcessingTime;
```
##### Hudi Streamer Continuous Mode
-Hudi Streamer provides continuous ingestion mode where a single long running
spark application
+
+Hudi Streamer provides continuous ingestion mode where a single long running
spark application
ingests data to Hudi table continuously from upstream sources. In this mode,
Hudi supports managing asynchronous
compactions. Here is an example snippet for running in continuous mode with
async compactions
@@ -162,10 +181,11 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.2,or
```
#### Scheduling and Execution by a separate process
-For some use cases with long running table services, instead of having the
regular writes block, users have the option to run
-both steps of the compaction ([scheduling and
execution](#compaction-architecture)) offline in a separate process altogether.
+
+For some use cases with long running table services, instead of having the
regular writes block, users have the option to run
+both steps of the compaction ([scheduling and
execution](#compaction-architecture)) offline in a separate process altogether.
This allows for regular writers to not bother about these compaction steps and
allows users to provide more resources for
-the compaction job as needed.
+the compaction job as needed.
:::note
This model needs a lock provider configured for all jobs - the regular writer
as well as the offline compaction job.
@@ -173,8 +193,8 @@ This model needs a lock provider configured for all jobs -
the regular writer as
#### Scheduling inline and executing async
-In this model, it is possible for a Spark Datasource writer or a Flink job to
just schedule the compaction inline ( that
-will serialize the compaction plan in the timeline but will not execute it).
And then a separate utility like
+In this model, it is possible for a Spark Datasource writer or a Flink job to
just schedule the compaction inline ( that
+will serialize the compaction plan in the timeline but will not execute it).
And then a separate utility like
HudiCompactor or HoodieFlinkCompactor can take care of periodically executing
the compaction plan.
:::note
@@ -182,10 +202,12 @@ This model may need a lock provider **if** metadata table
is enabled.
:::
#### Hudi Compactor Utility
+
Hudi provides a standalone tool to execute specific compactions
asynchronously. Below is an example and you can read more in the [deployment
guide](cli#compactions)
The compactor utility allows to do scheduling and execution of compaction.
Example:
+
```properties
spark-submit --packages
org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.2,org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.2
\
--class org.apache.hudi.utilities.HoodieCompactor \
@@ -196,18 +218,21 @@ spark-submit --packages
org.apache.hudi:hudi-utilities-slim-bundle_2.12:1.0.2,or
```
Note, the `instant-time` parameter is now optional for the Hudi Compactor
Utility. If using the utility without `--instant time`,
-the spark-submit will execute the earliest scheduled compaction on the Hudi
timeline.
+the spark-submit will execute the earliest scheduled compaction on the Hudi
timeline.
#### Hudi CLI
+
Hudi CLI is yet another way to execute specific compactions asynchronously.
Here is an example and you can read more in the [deployment
guide](cli#compactions)
Example:
+
```properties
hudi:trips->compaction run --tableName <table_name> --parallelism
<parallelism> --compactionInstant <InstantTime>
...
```
#### Flink Offline Compaction
+
Offline compaction needs to submit the Flink task on the command line. The
program entry is as follows: `hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar` :
`org.apache.hudi.sink.compact.HoodieFlinkCompactor`
@@ -218,14 +243,14 @@ Offline compaction needs to submit the Flink task on the
command line. The progr
#### Options
-| Option Name | Default | Description |
-| -----------
|-------------------------------------------------------------------------------------------------------------------------------|
------- |
-| `--path` | `n/a **(Required)**` | The path where the target table is stored
on Hudi
|
-| `--compaction-max-memory` | `100` (Optional) | The index map size of log
data during compaction, 100 MB by default. If you have enough memory, you can
turn up this parameter |
-| `--schedule` | `false` (Optional) | whether to execute the operation of
scheduling compaction plan. When the write process is still writing, turning on
this parameter have a risk of losing data. Therefore, it must be ensured that
there are no write tasks currently writing data to this table when this
parameter is turned on |
-| `--seq` | `LIFO` (Optional) | The order in which compaction tasks are
executed. Executing from the latest compaction plan by default. `LIFO`:
executing from the latest plan. `FIFO`: executing from the oldest plan. |
-| `--service` | `false` (Optional) | Whether to start a monitoring service
that checks and schedules new compaction task in configured interval. |
-| `--min-compaction-interval-seconds` | `600(s)` (optional) | The checking
interval for service mode, by default 10 minutes. |
+| Option Name | Default | Description
|
+|-------------------------------------|----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `--path` | `n/a **(Required)**` | The path where
the target table is stored on Hudi
|
+| `--compaction-max-memory` | `100` (Optional) | The index map
size of log data during compaction, 100 MB by default. If you have enough
memory, you can turn up this parameter
|
+| `--schedule` | `false` (Optional) | whether to
execute the operation of scheduling compaction plan. When the write process is
still writing, turning on this parameter have a risk of losing data. Therefore,
it must be ensured that there are no write tasks currently writing data to this
table when this parameter is turned on |
+| `--seq` | `LIFO` (Optional) | The order in
which compaction tasks are executed. Executing from the latest compaction plan
by default. `LIFO`: executing from the latest plan. `FIFO`: executing from the
oldest plan.
|
+| `--service` | `false` (Optional) | Whether to
start a monitoring service that checks and schedules new compaction task in
configured interval.
|
+| `--min-compaction-interval-seconds` | `600(s)` (optional) | The checking
interval for service mode, by default 10 minutes.
|
## Related Resources
diff --git a/website/releases/release-1.1.0.md
b/website/releases/release-1.1.0.md
index 21b95a9119a2..5e1d857deab0 100644
--- a/website/releases/release-1.1.0.md
+++ b/website/releases/release-1.1.0.md
@@ -94,7 +94,9 @@ An optimization that enables direct copying of RowGroup-level
data from Parquet
#### Incremental Table Service Scheduling
-Significantly improves performance of compaction and clustering operations on
tables with large numbers of partitions. Enabled by default via
`hoodie.table.services.incremental.enabled`.
+Significantly improves performance of compaction and clustering operations on
tables with large numbers of partitions. Instead of scanning all partitions
during each scheduling run, incremental scheduling only processes partitions
that have changed since the last completed table service operation. Enabled by
default via `hoodie.table.services.incremental.enabled`.
+
+For more details, see [Incremental Scheduling in
Compaction](/docs/compaction#incremental-scheduling) and [Incremental
Scheduling in Clustering](/docs/clustering#incremental-scheduling).
### Concurrency Control