This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a7562a31d65d fix: Allow configurable storage level while computing
expression index update (#17737)
a7562a31d65d is described below
commit a7562a31d65dd5e066d3237006742a6ff08790f0
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Jan 29 10:35:38 2026 +0530
fix: Allow configurable storage level while computing expression index
update (#17737)
---------
Co-authored-by: Lokesh Jain <[email protected]>
---
.../apache/hudi/client/utils/SparkMetadataWriterUtils.java | 7 ++++---
.../org/apache/hudi/common/config/HoodieIndexingConfig.java | 13 +++++++++++++
.../java/org/apache/hudi/common/config/RecordMergeMode.java | 2 +-
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 11da57ce3f02..351512ed0940 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -154,7 +154,7 @@ public class SparkMetadataWriterUtils {
@SuppressWarnings("checkstyle:LineLength")
public static ExpressionIndexComputationMetadata
getExpressionIndexRecordsUsingColumnStats(Dataset<Row> dataset,
HoodieExpressionIndex<Column, Column> expressionIndex, String columnToIndex,
Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt,
-
HoodieIndexVersion indexVersion) {
+
HoodieIndexVersion indexVersion, String
expressionIndexRangeMetadataStorageLevel) {
// Aggregate col stats related data for the column to index
Dataset<Row> columnRangeMetadataDataset = dataset
.select(columnToIndex,
SparkMetadataWriterUtils.getExpressionIndexColumnNames())
@@ -197,7 +197,7 @@ public class SparkMetadataWriterUtils {
if (partitionRecordsFunctionOpt.isPresent()) {
// TODO: HUDI-8848: Allow configurable storage level while computing
expression index update
- rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");
+
rangeMetadataHoodieJavaRDD.persist(expressionIndexRangeMetadataStorageLevel);
}
HoodieData<HoodieRecord> colStatRecords =
rangeMetadataHoodieJavaRDD.map(pair ->
createColumnStatsRecords(pair.getKey(),
Collections.singletonList(pair.getValue()), false,
expressionIndex.getIndexName(),
@@ -308,7 +308,8 @@ public class SparkMetadataWriterUtils {
// Generate expression index records
if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
- return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt,
indexDefinition.getVersion());
+ return getExpressionIndexRecordsUsingColumnStats(rowDataset,
expressionIndex, columnToIndex, partitionRecordsFunctionOpt,
indexDefinition.getVersion(),
+
dataWriteConfig.getIndexingConfig().getExpressionIndexRangeMetadataStorageLevel());
} else if
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS))
{
return getExpressionIndexRecordsUsingBloomFilter(
rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(),
instantTime, indexDefinition);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java
index 4ddb9426b22a..0ab9158ad6e8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
+import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
@@ -79,6 +80,14 @@ public class HoodieIndexingConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Function to be used for building the expression
index.");
+ public static final ConfigProperty<String>
EXPRESSION_INDEX_RANGE_METADATA_STORAGE_LEVEL_VALUE = ConfigProperty
+ .key("hoodie.expression.index.range.metadata.storage.level")
+ .defaultValue("MEMORY_AND_DISK_SER")
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("Determine what level of persistence is used to cache
range metadata RDDs created to compute expression index. "
+ + "Refer to org.apache.spark.storage.StorageLevel for different
values");
+
public static final ConfigProperty<String> INDEX_DEFINITION_CHECKSUM =
ConfigProperty
.key("hoodie.table.checksum")
.noDefaultValue()
@@ -92,6 +101,10 @@ public class HoodieIndexingConfig extends HoodieConfig {
super();
}
+ public String getExpressionIndexRangeMetadataStorageLevel() {
+ return
getStringOrDefault(EXPRESSION_INDEX_RANGE_METADATA_STORAGE_LEVEL_VALUE).toUpperCase(Locale.ROOT);
+ }
+
public static void update(HoodieStorage storage, StoragePath metadataFolder,
Properties updatedProps) {
modify(storage, metadataFolder, updatedProps,
ConfigUtils::upsertProperties);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
index fa20d70094e7..e9a068180bfc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
@@ -29,7 +29,7 @@ public enum RecordMergeMode {
@EnumFieldDescription("Using event time as the ordering to merge records,
i.e., the record "
+ "with the larger event time overwrites the record with the smaller
event time on the "
- + "same key, regardless of transaction time. The event time or
preCombine field needs "
+ + "same key, regardless of transaction time. The event time or ordering
fields need "
+ "to be specified by the user.")
EVENT_TIME_ORDERING,