nfarah86 commented on code in PR #8381:
URL: https://github.com/apache/hudi/pull/8381#discussion_r1160744549


##########
website/docs/compaction.md:
##########
@@ -5,52 +5,156 @@ toc: true
 last_modified_at:
 ---
 
-## Async Compaction
-Compaction is executed asynchronously with Hudi by default. Async Compaction 
is performed in 2 steps:
+Hudi has a table service called compaction, where files are compacted together 
to form a new file version. In Hudi, two different storage types are available, 
Copy-On-Write (COW) and Merge-On-Read (MOR). Each storage type uses distinct 
file types: COW uses base files comprised of Parquet files (columnar-base); MOR 
uses base and log files consisting of Parquet files (columnar-base) and Avro 
files (row-base), respectively. Compaction is a mandatory table service in Hudi 
that only applies to MOR tables, not COW tables. 
 
-1. ***Compaction Scheduling***: This is done by the ingestion job. In this 
step, Hudi scans the partitions and selects **file
-   slices** to be compacted. A compaction plan is finally written to Hudi 
timeline.
-1. ***Compaction Execution***: In this step the compaction plan is read and 
file slices are compacted.
+## WHY MOR TABLES NEED COMPACTION
+In Hudi, data is organized in terms of [file 
groups](https://hudi.apache.org/docs/file_layouts/). Each file group in a MOR 
table consists of a base file and one or more log files. Typically during 
writes, updates are stored in log files, and inserts are stored in base files. 
During the compaction process, log files get compacted into a new base file 
version. Since Hudi writes updates directly to log files, MOR tables are 
write-optimized and have a lower write amplification than COW tables because no 
synchronous merge occurs during writes. In contrast, COW tables have a higher 
write amplification because Hudi applies each write to a new base file version, 
where a synchronous merge combines new writes with the older base files and 
forms a new base file version. 
 
-There are few ways by which we can execute compactions asynchronously.
+![MOR table file layout](/assets/images/docs/compaction/mor-layout-2-02.png)
+ Figure: Hudi’s MOR table file layouts showing diff file groups and its 
constituents.
 
-### Spark Structured Streaming
+ However, MOR tables may have higher read amplification for snapshot reads 
than COW tables because base files with their constituent log files are merged 
and served for each file group during query time. It’s essential to have 
compaction for MOR tables at regular intervals to bind the growth of log files 
and ensure the read latencies do not spike up enormously. However, in COW 
tables, no merge operation is needed during queries because all writes are in 
base files. As a result, COW tables have a lower read amplification.
 
-Compactions are scheduled and executed asynchronously inside the
-streaming job.  Async Compactions are enabled by default for structured 
streaming jobs
-on Merge-On-Read table.
+![Compaction process](/assets/images/docs/compaction/mor-layout-1-01.png)
+ Figure: Pictorial representation of Apache Hudi’s Compaction on a given file 
group. 
 
-Here is an example snippet in java
+## Strategies for compaction 
+To execute compaction, you need to configure two strategies: 
+- schedule strategy 
+- compaction strategy
 
-```properties
+Once you configure the compaction strategy, you can deploy the compaction 
process in two different ways:
+- inline compaction
+- asynchronous or offline compaction 
+
+Below, we’ll go over all the different strategies with compaction and how you 
can schedule, create, execute and deploy it. 
+
+### Schedule strategy
+The scheduling strategy determines how often to schedule compaction. You 
schedule compaction based on the commit number or time elapsed. There are 
multiple trigger options available. For example, you can set a trigger strategy 
to occur after 5 commits or after 10 minutes. After compaction is triggered, 
Hudi prepares a compaction strategy to decide what to compact (the compaction 
strategy is described later in the document).
+
+To set a trigger strategy that's suitable for your application, use the below 
configuration:
+
+- `hoodie.compact.inline.trigger.strategy`: This configuration is used for 
both async and inline compaction. It controls how the compaction schedule is 
triggered. Triggering is based on time, the number of delta commits or a 
combination of both. The valid options are: 
`NUM_COMMITS`,`NUM_COMMITS_AFTER_LAST_REQUEST`,`TIME_ELAPSED`,`NUM_AND_TIME` 
and `NUM_OR_TIME`. The default value is: `NUM_COMMITS`.
+
+Depending on those values for `hoodie.compact.inline.trigger.strategy`, you 
may need to set additional configurations. Below are the explanation of the 
values and additional configurations that need to be set: 
+- `NUM_COMMITS`: You schedule the compaction based on the number of delta 
commits (N). Use `hoodie.compact.inline.max.delta.commits` to define the value 
for (N). 
+- `NUM_COMMITS_AFTER_LAST_REQUEST`: You schedule the compaction based on the 
number of delta commits (N) after the last compaction. Again, you can use 
`hoodie.compact.inline.max.delta.commits` to define the value for the number of 
commits. 
+- `TIME_ELAPSED`: You schedule the compaction based on the time elapsed. To 
set the elapsed time, you can use the configurations, 
`hoodie.compact.inline.max.delta.seconds`.
+- `NUM_AND_TIME`: You schedule the compaction if the number of commits and 
time elapsed occurred. Use `hoodie.compact.inline.max.delta.commits` and 
`hoodie.compact.inline.max.delta.seconds` to define the value for (N) and set 
the elapsed time, respectively. 
+- `NUM_OR_TIME`: You schedule the compaction if the number of delta commits 
occurred OR the time has elapsed. Use `hoodie.compact.inline.max.delta.commits` 
and `hoodie.compact.inline.max.delta.seconds` to define the value for (N) and 
set the elapsed time, respectively. 
+
+The additional configurations based on the strategy values are below:
+
+- `hoodie.compact.inline.max.delta.commits`: This is the number of delta 
commits after the last compaction before the new compaction is scheduled. This 
configuration takes effect only for the compaction triggering strategy based on 
the number of commits: `NUM_COMMITS`, `NUM_COMMITS_AFTER_LAST_REQUEST`, 
`NUM_AND_TIME` and `NUM_OR_TIME`.
+- `hoodie.compact.inline.max.delta.seconds`: This is the number of elapsed 
seconds after the last compaction, before scheduling a new one. This 
configuration takes effect only for the compaction triggering strategy based on 
the elapsed time:  `TIME_ELAPSED`, `NUM_AND_TIME` and `NUM_OR_TIME`.
+
+:::note 
+Corner cases with performing compaction with indexes:
+For most cases with bloom, global_bloom, simple and global_simple, at write 
time, log files will consist of updates, and base files will consist of 
inserts. However, updates can go to base files due to small file handling. In 
the case of hbase index and bucket index, inserts may go to log files. 
+:::
+
+### Compaction Strategy
+The schedule strategy we just covered will only dictate when to schedule 
compaction. What exactly will get compacted, like which file groups, will be 
determined by the compaction strategy. Below is the configuration to set the 
compaction strategy:
+
+`hoodie.compaction.strategy`: This compaction strategy decides which file 
groups are picked up for compaction during each compaction run. Hudi picks the 
log file with the most accumulated unmerged data by default. Depending on the 
values chosen, you may need to set additional configurations. Below are the 
explanation of the values and additional configurations that need to be set: 
+
+- `LogFileSizeBasedCompactionStrategy`: This orders the compaction based on 
the total log file size. In this strategy, Hudi evaluates the whole table and 
retrieves all the file groups that do not have a pending compaction and have a 
log file size equal to or greater than the 
`hoodie.compaction.logfile.size.threshold`. The file groups that meet these 
requirements will be further trimmed by the IO bound. This IO-bound 
configuration has a 500GB default value, which you can override. Hudi 
accumulates all the file groups whose total IO required for compaction has a 
maximum target IO per the configuration set. The file groups that meet the 
target IO bound will only be added to the compaction plan, and a 
`.compaction.requested` instant is created in the `.hoodie` folder with all the 
plan details. For instance, if 100 file groups are eligible per the log file 
size threshold, but the IO required to compact 10 file groups amounts to the 
target IO config set, only 10 file groups will be consid
 ered for the compaction plan/schedule of interest. 
+
+- `LogFileNumBasedCompactionStrategy`: This orders the compaction based on the 
total log file count. In this strategy, Hudi evaluates the whole table and 
retrieves all the file groups that do not have a pending compaction and have a 
log file number equal to or greater to the 
`hoodie.compaction.logfile.number.threshold`. The IO bound will further trim 
the file groups that meet these requirements. This IO-bound configuration has a 
500GB default value, which you can override. Hudi accumulates all the file 
groups whose total IO required for compaction has a maximum target IO per the 
configuration set. The file groups that meet the target IO bound will only be 
added to the compaction plan, and a `.compaction.requested` instant is created 
in the `.hoodie` folder with all the plan details.  
+
+- `BoundedIOCompactionStrategy`: This compaction strategy looks at all the 
table’s file groups, ignoring the file groups that are pending for compaction, 
and keeps accumulating the file group’s log files to either meet or be under 
the threshold set by the hoodie.compaction.target.io value. This IO-bound 
configuration has a 500GB default value, which you can override. Hudi 
accumulates all the file groups whose total IO required for compaction has a 
maximum target IO per the configuration set. The file groups that meet the 
target IO bound will only be added to the compaction plan, and a 
`.compaction.requested` instant is created in the `.hoodie` folder with all the 
plan details. 
+
+- `DayBasedCompactionStrategy`: This denotes the number of latest partitions 
to compact during a compaction run. The default value is 10. This configuration 
is used for `BoundedPartitionAwareCompactionStrategy` and 
`UnBoundedPartitionAwareCompactionStrategy`.
+
+- `BoundedPartitionAwareCompactionStrategy`: This is a partition-level 
compaction where the strategy only works when the data set has a date partition 
in the YYYY/MM/DD format. Hudi compacts the current partitions between 
hoodie.compaction.daybased.target.partitions value, starting with the most 
current partition first. For example, if 
`hoodie.compaction.daybased.target.partitions=3` and the current or latest 
partition on a Hudi table is 2023/03/15, Hudi will prep the partition from 
2023/03/15, 2023/03/14 and 2023/03/13 for compaction. In this compaction 
strategy, the process is bounded between the latest partition available and the 
value set for the `hoodie.compaction.daybased.target.partitions`.
+
+- `UnBoundedPartitionAwareCompactionStrategy`: This is a partition-level 
compaction where the strategy only works when the data set has a date partition 
in the YYYY/MM/DD format. This strategy keeps compacting all partitions before 
the latest partition is available via 
`hoodie.compaction.daybased.target.partitions`. For example, if 
`hoodie.compaction.daybased.target.partitions=3` and the current or latest 
partition on a Hudi table is 2023/03/15, Hudi will prep all the partitions from 
2023/03/12 and earlier for compaction. In this compaction strategy, the process 
is unbounded starting from the n-th latest partition available, where n is the 
value set for the `hoodie.compaction.daybased.target.partitions`, till the 
earliest partition in the dataset.
+
+- `UnBoundedCompactionStrategy`: This strategy compacts all base files with 
its respective log files. For example, if you use a schedule strategy, like 
`num_commit=5`, all file groups with updates between those 5 commits will be 
prepped for compaction. There are no further trimming or settings you need to 
take into consideration. 
+
+The additional configurations based on the strategy values are below:
+
+- `hoodie.compaction.target.io`: This is the number of MBs to spend during the 
compaction run for the `LogFileSizeBasedCompactionStrategy`, 
`LogFileNumBasedCompactionStrategy` and `BoundedIOCompactionStrategy`. This 
value helps bound ingestion latency while compaction runs. 
+
+#### Example of how a compaction plan and strategy work in Hudi: 
+
+Let’s say you set the schedule and compaction strategy to these values: 
+
+```
+hoodie.compact.inline.trigger.strategy = num_commits 
+hoodie.compact.inline.max.delta.commits = 5
+hoodie.compaction.strategy = 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
+hoodie.compaction.logfile.size.threshold=104857600
+hoodie.compaction.target.io = 10240
+```
+
+After reaching 5 commits, Hudi evaluates the entire table to identify file 
groups with log files that meet or exceed the 100MB file size threshold. Let’s 
assume that out of 100 file groups, 50 file groups have log files that meet or 
exceed this threshold. Among these 50 file groups, Hudi accumulates log files 
up to 10GB, which is the specified target threshold. Once the 10GB log files 
limit is reached, Hudi finalizes the compaction plan and creates a 
`.compaction.requested` instant in the `.hoodie` directory. This results in a 
pending compaction. From there, the compaction plan is executed. 
+
+## Compaction deployment models:
+Once you have configured your compaction strategy, you need to execute it. 
Below are several options for how you can do this:
+
+### Inline Compaction
+Inline compaction refers to the process of executing the compaction process as 
part of the data ingestion pipeline, rather than running them asynchronously as 
a separate job. With inline compaction, Hudi will schedule, plan and execute 
the compaction operations after each commit is completed. This is the simplest 
deployment model to run because it’s easier to manage than running different 
asynchronus Spark jobs (see below). This mode is only supported on Spark 
Datasource, Spark-SQL and DeltaStreamer in sync-once mode. In the next section, 
we’ll go over code snippets you can use to get started with inline clustering. 
To run inline compaction with DeltaStreamer in continuous mode, you must pass 
the flag `--disable-compaction` so the async compaction is disabled (see 
below). With inline compaction, your data latency could be higher because 
you'll block any writer from ingesting data when compaction is being executed.
+
+**DeltaStreamer**: When both ingestion and compaction are running in the same 
spark context, you can use resource allocation configuration in DeltaStreamer 
CLI such as (`--delta-sync-scheduling-weight`, `--compact-scheduling-weight`, 
`--delta-sync-scheduling-minshare`, and `--compact-scheduling-minshare`) to 
control executor allocation between ingestion and compaction.
+
+### Asynchronous Compaction
+There are three ways you can execute an asynchronous clustering process:
+
+**Asynchronous execution within the same process**: In this deployment mode, 
Hudi will schedule, plan and execute the compaction operations after each 
commit is completed as part of the ingestion pipeline. Separately, Hudi spins 
up another thread within the same job and executes the compaction table 
service. This is supported by Spark Datasource, Spark streaming, Flink, 
DeltaStreamer in continuous mode and Spark-SQL.
+
+For this deployment mode, you need to enable 
`hoodie.compact.inline.trigger.strategy`.
+
+**Asynchronous scheduling and execution by a separate process**: In this 
deployment mode, you’ll write data into a Hudi table as part of the ingestion 
pipeline. In another job, you schedule and execute the compaction service. By 
running a different job for the compaction process, you rebalance how Hudi uses 
compute resources. You’ll use fewer compute resources for the writes and 
reserve more compute resources for the compaction process. Please ensure you 
have configured the lock providers for all jobs (both writer and table service 
jobs). In general, you’ll need to configure lock providers when we have two 
different jobs or two different processes occurring. All writers except the 
Spark streaming support this deployment model. For streaming ingestion use 
cases with DeltaStreamer in continuous mode, you’ll need to include the flag, 
`--disable-compaction`, so the default async compaction is disabled.
+
+For this deployment mode, you need to enable 
`hoodie.compact.inline.trigger.strategy`.
+
+**Scheduling inline and executing async**: In this deployment mode, you ingest 
data and schedule the compaction in one job; in another, you execute the 
compaction strategy. If you enable the metadata table, you do not need to 
provision lock providers. However, if you disable the metadata table, please 
ensure all jobs have the lock providers configured. Spark Datasource and Flink 
support this deployment option. 
+
+For this deployment mode, you need to enable 
`hoodie.compact.inline.trigger.strategy`.
+
+## Code Examples
+### Inline clustering example
+This is the most straightforward deployment option, where compaction will be 
triggered when the schedule compaction strategy threshold is met. The example 
below shows you how to run compaction every 4 commits: 
+
+```
+hoodie.compact.inline=true
+hoodie.compact.inline.max.delta.commits=4
+# Note: we are relying on default values for rest of the compaction configs 
like: 
+# hoodie.compact.inline.trigger.strategy=NUM_COMMITS
+# 
hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
+```
+
+### Async clustering example
+#### Spark Structured Streaming​ with the default async deployment model

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to