This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78ad883a067 [HUDI-6035] Make simple index parallelism auto inferred 
(#8468)
78ad883a067 is described below

commit 78ad883a067537bfef866dd5388faa4922efbd58
Author: clownxc <[email protected]>
AuthorDate: Sat Apr 29 22:25:07 2023 +0800

    [HUDI-6035] Make simple index parallelism auto inferred (#8468)
    
    
    ---------
    
    Co-authored-by: ClownXC <[email protected]>
    Co-authored-by: Raymond Xu <[email protected]>
---
 .../main/java/org/apache/hudi/config/HoodieIndexConfig.java    | 10 +++++-----
 .../java/org/apache/hudi/index/simple/HoodieSimpleIndex.java   |  7 ++++++-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index fd50fdb0f6d..dc0b1cd5f4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -189,14 +189,14 @@ public class HoodieIndexConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM = 
ConfigProperty
       .key("hoodie.simple.index.parallelism")
-      .defaultValue("100")
+      .defaultValue("0")
       .markAdvanced()
       .withDocumentation("Only applies if index type is SIMPLE. "
           + "This limits the parallelism of fetching records from the base 
files of affected "
-          + "partitions. The index picks the configured parallelism if the 
number of base "
-          + "files is larger than this configured value; otherwise, the number 
of base files "
-          + "is used as the parallelism. If the indexing stage is slow due to 
the limited "
-          + "parallelism, you can increase this to tune the performance.");
+          + "partitions. By default, this is auto computed based on input 
workload characteristics. "
+          + "If the parallelism is explicitly configured by the user, the 
user-configured "
+          + "value is used in defining the actual parallelism. If the indexing 
stage is slow "
+          + "due to the limited parallelism, you can increase this to tune the 
performance.");
 
   public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM = 
ConfigProperty
       .key("hoodie.global.simple.index.parallelism")
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
index 95823ff51e3..dbc49d0655f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -107,11 +107,16 @@ public class HoodieSimpleIndex
           
.getString(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL_VALUE));
     }
 
+    int inputParallelism = inputRecords.getNumPartitions();
+    int configuredSimpleIndexParallelism = config.getSimpleIndexParallelism();
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =
+        configuredSimpleIndexParallelism > 0 ? 
configuredSimpleIndexParallelism : inputParallelism;
     HoodiePairData<HoodieKey, HoodieRecord<R>> keyedInputRecords =
         inputRecords.mapToPair(record -> new ImmutablePair<>(record.getKey(), 
record));
     HoodiePairData<HoodieKey, HoodieRecordLocation> existingLocationsOnTable =
         fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keys(), 
context, hoodieTable,
-            config.getSimpleIndexParallelism());
+            targetParallelism);
 
     HoodieData<HoodieRecord<R>> taggedRecords =
         keyedInputRecords.leftOuterJoin(existingLocationsOnTable).map(entry -> 
{

Reply via email to