This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e51b4575cb7 [HUDI-5920] Improve documentation of parallelism configs
(#8157)
e51b4575cb7 is described below
commit e51b4575cb7642eb61bcc02d95c99466dd3e8eda
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Mar 17 15:17:24 2023 -0700
[HUDI-5920] Improve documentation of parallelism configs (#8157)
This commit improves the documentation for the following parallelism
configs:
- hoodie.archive.delete.parallelism
- hoodie.bloom.index.parallelism
- hoodie.simple.index.parallelism
- hoodie.global.simple.index.parallelism
- hoodie.insert.shuffle.parallelism
- hoodie.bulkinsert.shuffle.parallelism
- hoodie.upsert.shuffle.parallelism
- hoodie.delete.shuffle.parallelism
- hoodie.rollback.parallelism
---
.../apache/hudi/config/HoodieArchivalConfig.java | 9 +++-
.../org/apache/hudi/config/HoodieIndexConfig.java | 17 ++++++--
.../org/apache/hudi/config/HoodieWriteConfig.java | 48 ++++++++++++++++++----
3 files changed, 63 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
index 681ca20baee..fdfdab5897a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -63,7 +63,14 @@ public class HoodieArchivalConfig extends HoodieConfig {
public static final ConfigProperty<Integer>
DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.archive.delete.parallelism")
.defaultValue(100)
- .withDocumentation("Parallelism for deleting archived hoodie commits.");
+ .withDocumentation("When performing archival operation, Hudi needs to
delete the files of "
+ + "the archived instants in the active timeline in .hoodie folder.
The file deletion "
+ + "also happens after merging small archived files into larger ones
if enabled. "
+ + "This config limits the Spark parallelism for deleting files in
both cases, i.e., "
+ + "parallelism of deleting files does not go above the configured
value and the "
+ + "parallelism is the number of files to delete if smaller than the "
+ + "configured value. If you see that the file deletion in archival
operation is slow "
+ + "because of the limited parallelism, you can increase this to tune
the performance.");
public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP =
ConfigProperty
.key("hoodie.keep.min.commits")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 366e6aa4c04..0767fc46a56 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -115,7 +115,10 @@ public class HoodieIndexConfig extends HoodieConfig {
.defaultValue("0")
.withDocumentation("Only applies if index type is BLOOM. "
+ "This is the amount of parallelism for index lookup, which
involves a shuffle. "
- + "By default, this is auto computed based on input workload
characteristics.");
+ + "By default, this is auto computed based on input workload
characteristics. "
+ + "If the parallelism is explicitly configured by the user, the
user-configured "
+ + "value is used in defining the actual parallelism. If the indexing
stage is slow "
+ + "due to the limited parallelism, you can increase this to tune the
performance.");
public static final ConfigProperty<String> BLOOM_INDEX_PRUNE_BY_RANGES =
ConfigProperty
.key("hoodie.bloom.index.prune.by.ranges")
@@ -181,13 +184,21 @@ public class HoodieIndexConfig extends HoodieConfig {
.key("hoodie.simple.index.parallelism")
.defaultValue("100")
.withDocumentation("Only applies if index type is SIMPLE. "
- + "This is the amount of parallelism for index lookup, which
involves a Spark Shuffle");
+ + "This limits the parallelism of fetching records from the base
files of affected "
+ + "partitions. The index picks the configured parallelism if the
number of base "
+ + "files is larger than this configured value; otherwise, the number
of base files "
+ + "is used as the parallelism. If the indexing stage is slow due to
the limited "
+ + "parallelism, you can increase this to tune the performance.");
public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM =
ConfigProperty
.key("hoodie.global.simple.index.parallelism")
.defaultValue("100")
.withDocumentation("Only applies if index type is GLOBAL_SIMPLE. "
- + "This is the amount of parallelism for index lookup, which
involves a Spark Shuffle");
+ + "This limits the parallelism of fetching records from the base
files of all table "
+ + "partitions. The index picks the configured parallelism if the
number of base "
+ + "files is larger than this configured value; otherwise, the number
of base files "
+ + "is used as the parallelism. If the indexing stage is slow due to
the limited "
+ + "parallelism, you can increase this to tune the performance.");
// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom
filter.
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter
across partitions.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b919befbbc9..04d70a85f1a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -248,13 +248,29 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.insert.shuffle.parallelism")
.defaultValue("0")
- .withDocumentation("Parallelism for inserting records into the table.
Inserts can shuffle data before writing to tune file sizes and optimize the
storage layout.");
+ .withDocumentation("Parallelism for inserting records into the table.
Inserts can shuffle "
+ + "data before writing to tune file sizes and optimize the storage
layout. Before "
+ + "0.13.0 release, if users do not configure it, Hudi would use 200
as the default "
+ + "shuffle parallelism. From 0.13.0 onwards Hudi by default
automatically uses the "
+ + "parallelism deduced by Spark based on the source data. If the
shuffle parallelism "
+ + "is explicitly configured by the user, the user-configured
parallelism is "
+ + "used in defining the actual parallelism. If you observe small
files from the insert "
+ + "operation, we suggest configuring this shuffle parallelism
explicitly, so that the "
+ + "parallelism is around total_input_data_size/120MB.");
public static final ConfigProperty<String> BULKINSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.bulkinsert.shuffle.parallelism")
.defaultValue("0")
- .withDocumentation("For large initial imports using bulk_insert
operation, controls the parallelism to use for sort modes or custom
partitioning done"
- + "before writing records to the table.");
+ .withDocumentation("For large initial imports using bulk_insert
operation, controls the "
+ + "parallelism to use for sort modes or custom partitioning done
before writing records "
+ + "to the table. Before 0.13.0 release, if users do not configure
it, Hudi would use "
+ + "200 as the default shuffle parallelism. From 0.13.0 onwards Hudi
by default "
+ + "automatically uses the parallelism deduced by Spark based on the
source data or "
+ + "the parallelism based on the logical plan for row writer. If the
shuffle parallelism "
+ + "is explicitly configured by the user, the user-configured
parallelism is "
+ + "used in defining the actual parallelism. If you observe small
files from the bulk insert "
+ + "operation, we suggest configuring this shuffle parallelism
explicitly, so that the "
+ + "parallelism is around total_input_data_size/120MB.");
public static final ConfigProperty<String>
BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty
.key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
@@ -272,18 +288,36 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.upsert.shuffle.parallelism")
.defaultValue("0")
- .withDocumentation("Parallelism to use for upsert operation on the
table. Upserts can shuffle data to perform index lookups, file sizing, bin
packing records optimally"
- + "into file groups.");
+ .withDocumentation("Parallelism to use for upsert operation on the
table. Upserts can "
+ + "shuffle data to perform index lookups, file sizing, bin packing
records optimally "
+ + "into file groups. Before 0.13.0 release, "
+ + "if users do not configure it, Hudi would use 200 as the default "
+ + "shuffle parallelism. From 0.13.0 onwards Hudi by default
automatically uses the "
+ + "parallelism deduced by Spark based on the source data. If the
shuffle parallelism "
+ + "is explicitly configured by the user, the user-configured
parallelism is "
+ + "used in defining the actual parallelism. If you observe small
files from the upsert "
+ + "operation, we suggest configuring this shuffle parallelism
explicitly, so that the "
+ + "parallelism is around total_input_data_size/120MB.");
public static final ConfigProperty<String> DELETE_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.delete.shuffle.parallelism")
.defaultValue("0")
- .withDocumentation("Parallelism used for “delete” operation. Delete
operations also performs shuffles, similar to upsert operation.");
+ .withDocumentation("Parallelism used for delete operation. Delete
operations also performs "
+ + "shuffles, similar to upsert operation. Before 0.13.0 release, "
+ + "if users do not configure it, Hudi would use 200 as the default "
+ + "shuffle parallelism. From 0.13.0 onwards Hudi by default
automatically uses the "
+ + "parallelism deduced by Spark based on the source data. If the
shuffle parallelism "
+ + "is explicitly configured by the user, the user-configured
parallelism is "
+ + "used in defining the actual parallelism.");
public static final ConfigProperty<String> ROLLBACK_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.rollback.parallelism")
.defaultValue("100")
- .withDocumentation("Parallelism for rollback of commits. Rollbacks
perform delete of files or logging delete blocks to file groups on storage in
parallel.");
+ .withDocumentation("This config controls the parallelism for rollback of
commits. "
+ + "Rollbacks perform deletion of files or logging delete blocks to
file groups on "
+ + "storage in parallel. The configure value limits the parallelism
so that the number "
+ + "of Spark tasks do not exceed the value. If rollback is slow due
to the limited "
+ + "parallelism, you can increase this to tune the performance.");
public static final ConfigProperty<String> WRITE_BUFFER_LIMIT_BYTES_VALUE =
ConfigProperty
.key("hoodie.write.buffer.limit.bytes")