This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be92be657a3 [HUDI-5786] Add a new config to specific spark write rdd
storage level (#7941)
be92be657a3 is described below
commit be92be657a348954cc21062ca24e8a10caea17ee
Author: gaoshihang <[email protected]>
AuthorDate: Mon Feb 20 10:05:09 2023 +0800
[HUDI-5786] Add a new config to specific spark write rdd storage level
(#7941)
* add a new config to specific spark write rdd storage level
* update
* update
* update
---------
Co-authored-by: gaoshihang <[email protected]>
---
.../main/java/org/apache/hudi/config/HoodieWriteConfig.java | 10 ++++++++++
.../table/action/commit/BaseSparkCommitActionExecutor.java | 4 ++--
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3da856f61ac..e3b4f046bae 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -127,6 +127,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with
metastores like HMS. Needs to be same across runs.");
+ public static final ConfigProperty<String> TAGGED_RECORD_STORAGE_LEVEL_VALUE
= ConfigProperty
+ .key("hoodie.write.tagged.record.storage.level")
+ .defaultValue("MEMORY_AND_DISK_SER")
+ .withDocumentation("Determine what level of persistence is used to
cache write RDDs. "
+ + "Refer to org.apache.spark.storage.StorageLevel for different
values");
+
public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME =
ConfigProperty
.key("hoodie.datasource.write.precombine.field")
.defaultValue("ts")
@@ -1069,6 +1075,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getSchema();
}
+ public String getTaggedRecordStorageLevel() {
+ return getString(TAGGED_RECORD_STORAGE_LEVEL_VALUE);
+ }
+
public String getInternalSchema() {
return getString(INTERNAL_SCHEMA_STRING);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ac5b8555b05..73c855d5505 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,10 +148,10 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
execute(HoodieData<HoodieRecord<T>> inputRecords) {
// Cache the tagged records, so we don't end up computing both
- // TODO: Consistent contract in HoodieWriteClient regarding preppedRecord
storage level handling
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
- inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+ String writeStorageLevel = config.getTaggedRecordStorageLevel();
+ inputRDD.persist(StorageLevel.fromString(writeStorageLevel));
} else {
LOG.info("RDD PreppedRecords was persisted at: " +
inputRDD.getStorageLevel());
}