This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be92be657a3 [HUDI-5786] Add a new config to specific spark write rdd 
storage level (#7941)
be92be657a3 is described below

commit be92be657a348954cc21062ca24e8a10caea17ee
Author: gaoshihang <[email protected]>
AuthorDate: Mon Feb 20 10:05:09 2023 +0800

    [HUDI-5786] Add a new config to specific spark write rdd storage level 
(#7941)
    
    * add a new config to specific spark write rdd storage level
    
    * update
    
    * update
    
    * update
    
    ---------
    
    Co-authored-by: gaoshihang <[email protected]>
---
 .../main/java/org/apache/hudi/config/HoodieWriteConfig.java    | 10 ++++++++++
 .../table/action/commit/BaseSparkCommitActionExecutor.java     |  4 ++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3da856f61ac..e3b4f046bae 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -127,6 +127,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Table name that will be used for registering with 
metastores like HMS. Needs to be same across runs.");
 
+  public static final ConfigProperty<String> TAGGED_RECORD_STORAGE_LEVEL_VALUE 
= ConfigProperty
+       .key("hoodie.write.tagged.record.storage.level")
+       .defaultValue("MEMORY_AND_DISK_SER")
+       .withDocumentation("Determine what level of persistence is used to 
cache write RDDs. "
+          + "Refer to org.apache.spark.storage.StorageLevel for different 
values");
+
   public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
       .defaultValue("ts")
@@ -1069,6 +1075,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getSchema();
   }
 
+  public String getTaggedRecordStorageLevel() {
+    return getString(TAGGED_RECORD_STORAGE_LEVEL_VALUE);
+  }
+
   public String getInternalSchema() {
     return getString(INTERNAL_SCHEMA_STRING);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ac5b8555b05..73c855d5505 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,10 +148,10 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(HoodieData<HoodieRecord<T>> inputRecords) {
     // Cache the tagged records, so we don't end up computing both
-    // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord 
storage level handling
     JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
     if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
-      inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+      String writeStorageLevel = config.getTaggedRecordStorageLevel();
+      inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
     } else {
       LOG.info("RDD PreppedRecords was persisted at: " + 
inputRDD.getStorageLevel());
     }

Reply via email to