This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cc8622ddd6 [HUDI-5216] Implement file-level stats update for index 
function (#12069)
5cc8622ddd6 is described below

commit 5cc8622ddd6359eebda09b4c82b446c4acdf27f6
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Oct 11 07:06:26 2024 +0530

    [HUDI-5216] Implement file-level stats update for index function (#12069)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../client/utils/SparkMetadataWriterUtils.java     |  3 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |  5 +-
 .../hudi/metadata/HoodieMetadataPayload.java       | 54 ++++++++++++-------
 .../hudi/command/index/TestFunctionalIndex.scala   | 60 ++++++++++++++++++++++
 5 files changed, 100 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9824972edbf..d1c1f8c35e0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1053,7 +1053,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
    * Update functional index from {@link HoodieCommitMetadata}.
    */
   private void updateFunctionalIndexIfPresent(HoodieCommitMetadata 
commitMetadata, String instantTime, Map<String, HoodieData<HoodieRecord>> 
partitionToRecordMap) {
-    if (!dataWriteConfig.getMetadataConfig().isFunctionalIndexEnabled()) {
+    if 
(!MetadataPartitionType.FUNCTIONAL_INDEX.isMetadataPartitionAvailable(dataMetaClient))
 {
       return;
     }
     dataMetaClient.getTableConfig().getMetadataPartitions()
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8072b34c6d3..ece6f479cd6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -52,6 +52,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
 
 /**
  * Utility methods for writing metadata for functional index.
@@ -86,7 +87,7 @@ public class SparkMetadataWriterUtils {
         buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath());
       });
     }
-    return createColumnStatsRecords(partition, columnRangeMetadataList, 
false).collect(Collectors.toList());
+    return createColumnStatsRecords(partition, columnRangeMetadataList, false, 
functionalIndex.getIndexName(), 
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
   }
 
   public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(HoodieTableMetaClient metaClient,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 4dd46e03243..08638526de4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -163,8 +163,6 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
                                                                
HoodieIndexDefinition indexDefinition,
                                                                
HoodieTableMetaClient metaClient, int parallelism,
                                                                Schema 
readerSchema, StorageConfiguration<?> storageConf) {
-    HoodieFunctionalIndex<Column, Column> functionalIndex =
-        new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
     HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext) 
engineContext;
     if (indexDefinition.getSourceFields().isEmpty()) {
       // In case there are no columns to index, bail
@@ -175,11 +173,12 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
     //       HUDI-6994 will address this.
     String columnToIndex = indexDefinition.getSourceFields().get(0);
     SQLContext sqlContext = sparkEngineContext.getSqlContext();
-    String basePath = metaClient.getBasePath().toString();
 
     // Group FileSlices by partition
     Map<String, List<FileSlice>> partitionToFileSlicesMap = 
partitionFileSlicePairs.stream()
         .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toList())));
+    HoodieFunctionalIndex<Column, Column> functionalIndex =
+        new HoodieSparkFunctionalIndex(indexDefinition.getIndexName(), 
indexDefinition.getIndexFunction(), indexDefinition.getSourceFields(), 
indexDefinition.getIndexOptions());
     List<HoodieRecord> allRecords = new ArrayList<>();
     for (Map.Entry<String, List<FileSlice>> entry : 
partitionToFileSlicesMap.entrySet()) {
       String partition = entry.getKey();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index d86f6c101ee..7e32acd0038 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -478,27 +478,43 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   public static Stream<HoodieRecord> createColumnStatsRecords(String 
partitionName,
                                                               
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
                                                               boolean 
isDeleted) {
-    return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
-      HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, 
columnRangeMetadata),
-          MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+    return columnRangeMetadataList.stream().map(
+        columnRangeMetadata -> createColumnStatsRecord(partitionName, 
columnRangeMetadata, isDeleted,
+            MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
MetadataPartitionType.COLUMN_STATS.getRecordType()));
+  }
 
-      HoodieMetadataPayload payload = new HoodieMetadataPayload(
-          key.getRecordKey(),
-          HoodieMetadataColumnStats.newBuilder()
-              .setFileName(new 
StoragePath(columnRangeMetadata.getFilePath()).getName())
-              .setColumnName(columnRangeMetadata.getColumnName())
-              
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
-              
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
-              .setNullCount(columnRangeMetadata.getNullCount())
-              .setValueCount(columnRangeMetadata.getValueCount())
-              .setTotalSize(columnRangeMetadata.getTotalSize())
-              
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
-              .setIsDeleted(isDeleted)
-              .build(),
-          MetadataPartitionType.COLUMN_STATS.getRecordType());
+  public static Stream<HoodieRecord> createColumnStatsRecords(String 
partitionName,
+                                                              
Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+                                                              boolean 
isDeleted,
+                                                              String 
metadataPartitionName,
+                                                              int recordType) {
+    return columnRangeMetadataList.stream().map(
+        columnRangeMetadata -> createColumnStatsRecord(partitionName, 
columnRangeMetadata, isDeleted,
+            metadataPartitionName, recordType));
+  }
+
+  private static HoodieAvroRecord<HoodieMetadataPayload> 
createColumnStatsRecord(String partitionName,
+                                                                               
  HoodieColumnRangeMetadata<Comparable> columnRangeMetadata,
+                                                                               
  boolean isDeleted,
+                                                                               
  String metadataPartitionName,
+                                                                               
  int recordType) {
+    HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, 
columnRangeMetadata), metadataPartitionName);
+    HoodieMetadataPayload payload = new HoodieMetadataPayload(
+        key.getRecordKey(),
+        HoodieMetadataColumnStats.newBuilder()
+            .setFileName(new 
StoragePath(columnRangeMetadata.getFilePath()).getName())
+            .setColumnName(columnRangeMetadata.getColumnName())
+            .setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
+            .setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
+            .setNullCount(columnRangeMetadata.getNullCount())
+            .setValueCount(columnRangeMetadata.getValueCount())
+            .setTotalSize(columnRangeMetadata.getTotalSize())
+            
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
+            .setIsDeleted(isDeleted)
+            .build(),
+        recordType);
 
-      return new HoodieAvroRecord<>(key, payload);
-    });
+    return new HoodieAvroRecord<>(key, payload);
   }
 
   public static Stream<HoodieRecord> createPartitionStatsRecords(String 
partitionPath,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 356aed35b70..5fb046faee4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -424,6 +424,66 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Functional Index File-level Stats Update") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        // create a simple partitioned mor table and insert some records
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  price double,
+             |  ts long,
+             |  name string
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(name)
+             | location '$basePath'
+       """.stripMargin)
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+        spark.sql(s"insert into $tableName values(1, 10, 1601098924, 'a1')")
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+        spark.sql(s"insert into $tableName values(2, 10, 1632634924, 'a1')")
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+        spark.sql(s"insert into $tableName values(3, 10, 1664170924, 'a2')")
+        // create functional index and verify
+        spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+        val metaClient = createMetaClient(spark, basePath)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getIndexMetadata.isPresent)
+        assertEquals(1, 
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+        // verify functional index records by querying metadata table
+        val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') 
where type=3"
+        checkAnswer(metadataSql)(
+          Seq("2020-09-26", "2021-09-26"), // for file in name=a1
+          Seq("2022-09-26", "2022-09-26") // for file in name
+        )
+
+        // do another insert after initializing the index
+        // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2024-09-26
+        spark.sql(s"insert into $tableName values(5, 10, 1727329324, 'a3')")
+        // check query result for predicates including values when functional 
index was disabled
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') IN ('2024-09-26', '2022-09-26')")(
+          Seq(3, "a2"),
+          Seq(5, "a3")
+        )
+        // verify there are new updates to functional index
+        checkAnswer(metadataSql)(
+          Seq("2020-09-26", "2021-09-26"),
+          Seq("2022-09-26", "2022-09-26"),
+          Seq("2024-09-26", "2024-09-26") // for file in name=a3
+        )
+      }
+    }
+  }
+
   test("Test Enable and Disable Functional Index") {
     if (HoodieSparkUtils.gteqSpark3_3) {
       withTempDir { tmp =>

Reply via email to