This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78ad883a067 [HUDI-6035] Make simple index parallelism auto inferred
(#8468)
78ad883a067 is described below
commit 78ad883a067537bfef866dd5388faa4922efbd58
Author: clownxc <[email protected]>
AuthorDate: Sat Apr 29 22:25:07 2023 +0800
[HUDI-6035] Make simple index parallelism auto inferred (#8468)
---------
Co-authored-by: ClownXC <[email protected]>
Co-authored-by: Raymond Xu <[email protected]>
---
.../main/java/org/apache/hudi/config/HoodieIndexConfig.java | 10 +++++-----
.../java/org/apache/hudi/index/simple/HoodieSimpleIndex.java | 7 ++++++-
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index fd50fdb0f6d..dc0b1cd5f4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -189,14 +189,14 @@ public class HoodieIndexConfig extends HoodieConfig {
public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM =
ConfigProperty
.key("hoodie.simple.index.parallelism")
- .defaultValue("100")
+ .defaultValue("0")
.markAdvanced()
.withDocumentation("Only applies if index type is SIMPLE. "
+ "This limits the parallelism of fetching records from the base
files of affected "
- + "partitions. The index picks the configured parallelism if the
number of base "
- + "files is larger than this configured value; otherwise, the number
of base files "
- + "is used as the parallelism. If the indexing stage is slow due to
the limited "
- + "parallelism, you can increase this to tune the performance.");
+ + "partitions. By default, this is auto computed based on input
workload characteristics. "
+ + "If the parallelism is explicitly configured by the user, the
user-configured "
+ + "value is used in defining the actual parallelism. If the indexing
stage is slow "
+ + "due to the limited parallelism, you can increase this to tune the
performance.");
public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM =
ConfigProperty
.key("hoodie.global.simple.index.parallelism")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 95823ff51e3..dbc49d0655f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,11 +107,16 @@ public class HoodieSimpleIndex
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
}
+ int inputParallelism = inputRecords.getNumPartitions();
+ int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
+ // NOTE: Target parallelism could be overridden by the config
+ int targetParallelism =
+ configuredSimpleIndexParallelism > 0 ?
configuredSimpleIndexParallelism : inputParallelism;
HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(),
record));
HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(),
context, hoodieTable,
- config.getSimpleIndexParallelism());
+ targetParallelism);
HoodieData<HoodieRecord<R>> taggedRecords =
keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry ->
{