This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cc8622ddd6 [HUDI-5216] Implement file-level stats update for index
function (#12069)
5cc8622ddd6 is described below
commit 5cc8622ddd6359eebda09b4c82b446c4acdf27f6
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Oct 11 07:06:26 2024 +0530
[HUDI-5216] Implement file-level stats update for index function (#12069)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../client/utils/SparkMetadataWriterUtils.java | 3 +-
.../SparkHoodieBackedTableMetadataWriter.java | 5 +-
.../hudi/metadata/HoodieMetadataPayload.java | 54 ++++++++++++-------
.../hudi/command/index/TestFunctionalIndex.scala | 60 ++++++++++++++++++++++
5 files changed, 100 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9824972edbf..d1c1f8c35e0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1053,7 +1053,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
* Update functional index from {@link HoodieCommitMetadata}.
*/
private void updateFunctionalIndexIfPresent(HoodieCommitMetadata
commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>>
partitionToRecordMap) {
- if (!dataWriteConfig.getMetadataConfig().isFunctionalIndexEnabled()) {
+ if
(!MetadataPartitionType.FUNCTIONAL_INDEX.isMetadataPartitionAvailable(dataMetaClient))
{
return;
}
dataMetaClient.getTableConfig().getMetadataPartitions()
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8072b34c6d3..ece6f479cd6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -52,6 +52,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
/**
* Utility methods for writing metadata for functional index.
@@ -86,7 +87,7 @@ public class SparkMetadataWriterUtils {
buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
logFile.getPath());
});
}
- return createColumnStatsRecords(partition, columnRangeMetadataList,
false).collect(Collectors.toList());
+ return createColumnStatsRecords(partition, columnRangeMetadataList, false,
functionalIndex.getIndexName(),
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
}
public static List<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 4dd46e03243..08638526de4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -163,8 +163,6 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, int parallelism,
Schema
readerSchema, StorageConfiguration<?> storageConf) {
- HoodieFunctionalIndex<Column, Column> functionalIndex =
- new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
if (indexDefinition.getSourceFields().isEmpty()) {
// In case there are no columns to index, bail
@@ -175,11 +173,12 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
// HUDI-6994 will address this.
String columnToIndex = indexDefinition.getSourceFields().get(0);
SQLContext sqlContext = sparkEngineContext.getSqlContext();
- String basePath = metaClient.getBasePath().toString();
// Group FileSlices by partition
Map<String, List<FileSlice>> partitionToFileSlicesMap =
partitionFileSlicePairs.stream()
.collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
+ HoodieFunctionalIndex<Column, Column> functionalIndex =
+ new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(),
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(),
indexDefinition.getIndexOptions());
List<HoodieRecord> allRecords = new ArrayList<>();
for (Map.Entry<String, List<FileSlice>> entry :
partitionToFileSlicesMap.entrySet()) {
String partition = entry.getKey();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index d86f6c101ee..7e32acd0038 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -478,27 +478,43 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
public static Stream<HoodieRecord> createColumnStatsRecords(String
partitionName,
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
boolean
isDeleted) {
- return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
- HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName,
columnRangeMetadata),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+ return columnRangeMetadataList.stream().map(
+ columnRangeMetadata -> createColumnStatsRecord(partitionName,
columnRangeMetadata, isDeleted,
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
MetadataPartitionType.COLUMN_STATS.getRecordType()));
+ }
- HoodieMetadataPayload payload = new HoodieMetadataPayload(
- key.getRecordKey(),
- HoodieMetadataColumnStats.newBuilder()
- .setFileName(new
StoragePath(columnRangeMetadata.getFilePath()).getName())
- .setColumnName(columnRangeMetadata.getColumnName())
-
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
-
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
- .setNullCount(columnRangeMetadata.getNullCount())
- .setValueCount(columnRangeMetadata.getValueCount())
- .setTotalSize(columnRangeMetadata.getTotalSize())
-
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
- .setIsDeleted(isDeleted)
- .build(),
- MetadataPartitionType.COLUMN_STATS.getRecordType());
+ public static Stream<HoodieRecord> createColumnStatsRecords(String
partitionName,
+
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+ boolean
isDeleted,
+ String
metadataPartitionName,
+ int recordType) {
+ return columnRangeMetadataList.stream().map(
+ columnRangeMetadata -> createColumnStatsRecord(partitionName,
columnRangeMetadata, isDeleted,
+ metadataPartitionName, recordType));
+ }
+
+ private static HoodieAvroRecord<HoodieMetadataPayload>
createColumnStatsRecord(String partitionName,
+
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata,
+
boolean isDeleted,
+
String metadataPartitionName,
+
int recordType) {
+ HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName,
columnRangeMetadata), metadataPartitionName);
+ HoodieMetadataPayload payload = new HoodieMetadataPayload(
+ key.getRecordKey(),
+ HoodieMetadataColumnStats.newBuilder()
+ .setFileName(new
StoragePath(columnRangeMetadata.getFilePath()).getName())
+ .setColumnName(columnRangeMetadata.getColumnName())
+ .setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
+ .setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
+ .setNullCount(columnRangeMetadata.getNullCount())
+ .setValueCount(columnRangeMetadata.getValueCount())
+ .setTotalSize(columnRangeMetadata.getTotalSize())
+
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
+ .setIsDeleted(isDeleted)
+ .build(),
+ recordType);
- return new HoodieAvroRecord<>(key, payload);
- });
+ return new HoodieAvroRecord<>(key, payload);
}
public static Stream<HoodieRecord> createPartitionStatsRecords(String
partitionPath,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 356aed35b70..5fb046faee4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -424,6 +424,66 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test Functional Index File-level Stats Update") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ // create a simple partitioned mor table and insert some records
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | price double,
+ | ts long,
+ | name string
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(name)
+ | location '$basePath'
+ """.stripMargin)
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+ spark.sql(s"insert into $tableName values(1, 10, 1601098924, 'a1')")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+ spark.sql(s"insert into $tableName values(2, 10, 1632634924, 'a1')")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+ spark.sql(s"insert into $tableName values(3, 10, 1664170924, 'a2')")
+ // create functional index and verify
+ spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+ val metaClient = createMetaClient(spark, basePath)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ assertEquals(1,
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+ // verify functional index records by querying metadata table
+ val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName')
where type=3"
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2021-09-26"), // for file in name=a1
+ Seq("2022-09-26", "2022-09-26") // for file in name
+ )
+
+ // do another insert after initializing the index
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2024-09-26
+ spark.sql(s"insert into $tableName values(5, 10, 1727329324, 'a3')")
+ // check query result for predicates including values when functional
index was disabled
+ checkAnswer(s"select id, name from $tableName where from_unixtime(ts,
'yyyy-MM-dd') IN ('2024-09-26', '2022-09-26')")(
+ Seq(3, "a2"),
+ Seq(5, "a3")
+ )
+ // verify there are new updates to functional index
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2021-09-26"),
+ Seq("2022-09-26", "2022-09-26"),
+ Seq("2024-09-26", "2024-09-26") // for file in name=a3
+ )
+ }
+ }
+ }
+
test("Test Enable and Disable Functional Index") {
if (HoodieSparkUtils.gteqSpark3_3) {
withTempDir { tmp =>