This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87dd78c80de [HUDI-8343] Fix functional index update for clean 
operation (#12096)
87dd78c80de is described below

commit 87dd78c80de1ebc166fa2689016f29a55cd07485
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Oct 15 19:30:44 2024 +0530

    [HUDI-8343] Fix functional index update for clean operation (#12096)
---
 .../client/utils/SparkMetadataWriterUtils.java     | 24 ++++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 40 +++++++++-
 .../hudi/common/testutils/HoodieTestUtils.java     | 17 ++++
 .../hudi/command/index/TestFunctionalIndex.scala   | 91 ++++++++++++++++++++--
 4 files changed, 159 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 8e9f0696c8d..48bd0f9a64c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -161,7 +162,7 @@ public class SparkMetadataWriterUtils {
                                                boolean isBaseFile) {
     List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new 
HoodieBaseFile(paths[0].toString()), metaClient, schema)
         : 
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
 metaClient, schema);
-    return dropMetaFields(toDataset(records, schema, sqlContext));
+    return dropMetaFields(toDataset(records, schema, sqlContext, isBaseFile));
   }
 
   private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> 
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
@@ -193,17 +194,26 @@ public class SparkMetadataWriterUtils {
     }
   }
 
-  private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema 
schema, SQLContext sqlContext) {
+  private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema 
schema, SQLContext sqlContext, boolean isBaseFile) {
     List<GenericRecord> avroRecords = records.stream()
-        .map(r -> (GenericRecord) r.getData())
+        .map(r -> {
+          if (isBaseFile) {
+            return (GenericRecord) r.getData();
+          }
+          HoodieRecordPayload payload = (HoodieRecordPayload) r.getData();
+          try {
+            return (GenericRecord) payload.getInsertValue(schema).get();
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to extract Avro payload", e);
+          }
+        })
         .collect(Collectors.toList());
     if (avroRecords.isEmpty()) {
       return sqlContext.emptyDataFrame().toDF();
     }
-    try (JavaSparkContext jsc = new 
JavaSparkContext(sqlContext.sparkContext())) {
-      JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
-      return AvroConversionUtils.createDataFrame(javaRDD.rdd(), 
schema.toString(), sqlContext.sparkSession());
-    }
+    JavaSparkContext jsc = new JavaSparkContext(sqlContext.sparkContext());
+    JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
+    return AvroConversionUtils.createDataFrame(javaRDD.rdd(), 
schema.toString(), sqlContext.sparkSession());
   }
 
   private static <T extends Comparable<T>> 
HoodieColumnRangeMetadata<Comparable> computeColumnRangeMetadata(Dataset<Row> 
rowDataset,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 286187a38ed..30afca04e8c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -53,6 +53,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -571,10 +572,47 @@ public class HoodieTableMetadataUtil {
               dataMetaClient, isColumnStatsIndexEnabled, 
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
       
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
 metadataColumnStatsRDD);
     }
+    if 
(enabledPartitionTypes.contains(MetadataPartitionType.FUNCTIONAL_INDEX)) {
+      convertMetadataToFunctionalIndexRecords(engineContext, cleanMetadata, 
instantTime, dataMetaClient, bloomIndexParallelism, 
columnStatsIndexParallelism, partitionToRecordsMap);
+    }
 
     return partitionToRecordsMap;
   }
 
+  private static void 
convertMetadataToFunctionalIndexRecords(HoodieEngineContext engineContext, 
HoodieCleanMetadata cleanMetadata,
+                                                              String 
instantTime, HoodieTableMetaClient dataMetaClient,
+                                                              int 
bloomIndexParallelism, int columnStatsIndexParallelism,
+                                                              Map<String, 
HoodieData<HoodieRecord>> partitionToRecordsMap) {
+    Option<HoodieIndexMetadata> indexMetadata = 
dataMetaClient.getIndexMetadata();
+    if (indexMetadata.isPresent()) {
+      HoodieIndexMetadata metadata = indexMetadata.get();
+      Map<String, HoodieIndexDefinition> indexDefinitions = 
metadata.getIndexDefinitions();
+      if (indexDefinitions.isEmpty()) {
+        throw new HoodieMetadataException("Functional index metadata not 
found");
+      }
+      // iterate over each index definition and check:
+      // if it is a functional index using column_stats, then follow the same 
approach as column_stats
+      // if it is a functional index using bloom_filters, then follow the same 
approach as bloom_filters
+      // else throw an exception
+      for (Map.Entry<String, HoodieIndexDefinition> entry : 
indexDefinitions.entrySet()) {
+        String indexName = entry.getKey();
+        HoodieIndexDefinition indexDefinition = entry.getValue();
+        if 
(MetadataPartitionType.FUNCTIONAL_INDEX.equals(MetadataPartitionType.fromPartitionPath(indexDefinition.getIndexName())))
 {
+          if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) 
{
+            partitionToRecordsMap.put(indexName, 
convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, instantTime, 
bloomIndexParallelism));
+          } else if 
(indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
+            partitionToRecordsMap.put(indexName,
+                convertMetadataToColumnStatsRecords(cleanMetadata, 
engineContext, dataMetaClient, true, columnStatsIndexParallelism, 
indexDefinition.getSourceFields()));
+          } else {
+            throw new HoodieMetadataException("Unsupported functional index 
type");
+          }
+        }
+      }
+    } else {
+      throw new HoodieMetadataException("Functional index metadata not found");
+    }
+  }
+
   /**
    * Finds all files that were deleted as part of a clean and creates metadata 
table records for them.
    *
@@ -1192,7 +1230,7 @@ public class HoodieTableMetadataUtil {
                                                             List<String> 
columnsToIndex,
                                                             boolean isDeleted) 
{
     String filePartitionPath = filePath.startsWith("/") ? 
filePath.substring(1) : filePath;
-    String fileName = FSUtils.getFileName(filePath, partitionPath);
+    String fileName = 
filePartitionPath.substring(filePartitionPath.lastIndexOf("/") + 1);
 
     if (isDeleted) {
       // TODO we should delete records instead of stubbing them
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index acee42be055..ef9a47e86ae 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -29,7 +30,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.HoodieStorage;
@@ -373,4 +376,18 @@ public class HoodieTestUtils {
       throw new HoodieIOException("Failed to get instant file status", e);
     }
   }
+
+  /**
+   * Gets the pair of partition and cleaned file from the clean metadata.
+   */
+  public static List<Pair<String, String>> 
getCleanedFiles(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) 
throws IOException {
+    HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(metaClient, cleanInstant);
+    List<Pair<String, String>> deleteFileList = new ArrayList<>();
+    cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
+      // Files deleted from a partition
+      List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
+      deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
+    });
+    return deleteFileList;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 5fb046faee4..de20cfb6fa2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
TypedProperties}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.Option
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig}
 import org.apache.hudi.hive.HiveSyncConfigHolder._
 import org.apache.hudi.hive.testutils.HiveTestUtil
 import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
@@ -536,11 +537,6 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
           Seq(3, "a2"),
           Seq(4, "a2")
         )
-        // verify there are no new updates to functional index
-        checkAnswer(metadataSql)(
-          Seq("2020-09-26", "2021-09-26"),
-          Seq("2022-09-26", "2022-09-26")
-        )
 
         // enable functional index
         spark.sql(s"set 
${HoodieMetadataConfig.FUNCTIONAL_INDEX_ENABLE_PROP.key}=true")
@@ -557,12 +553,97 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
         checkAnswer(metadataSql)(
           Seq("2020-09-26", "2021-09-26"),
           Seq("2022-09-26", "2022-09-26"),
+          Seq("2022-09-26", "2022-09-26"), // for file in name=a2
           Seq("2024-09-26", "2024-09-26") // for file in name=a3
         )
       }
     }
   }
 
+  // Test functional index using column stats and bloom filters, and then 
clean older version, and check index is correct.
+  test("Test Functional Index With Cleaning") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          Seq(true, false).foreach { isPartitioned =>
+            val tableName = generateTableName + 
s"_clean_$tableType$isPartitioned"
+            val partitionByClause = if (isPartitioned) "partitioned by(price)" 
else ""
+            val basePath = s"${tmp.getCanonicalPath}/$tableName"
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  ts long,
+                 |  price int
+                 |) using hudi
+                 | options (
+                 |  primaryKey ='id',
+                 |  type = '$tableType',
+                 |  preCombineField = 'ts',
+                 |  hoodie.clean.policy = 'KEEP_LATEST_COMMITS',
+                 |  ${HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key} = '1'
+                 | )
+                 | $partitionByClause
+                 | location '$basePath'
+       """.stripMargin)
+            if (tableType == "mor") {
+              spark.sql("set hoodie.compact.inline=true")
+              spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+            }
+            if (!isPartitioned) {
+              // setting this for non-partitioned table to ensure multiple 
file groups are created
+              spark.sql(s"set 
${HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()}=0")
+            }
+            // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+            spark.sql(s"insert into $tableName values(1, 'a1', 1601098924, 
10)")
+            // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+            spark.sql(s"insert into $tableName values(2, 'a2', 1632634924, 
100)")
+            // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+            spark.sql(s"insert into $tableName values(3, 'a3', 1664170924, 
1000)")
+            // create functional index
+            spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')")
+            // validate index created successfully
+            val metaClient = createMetaClient(spark, basePath)
+            
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+            assertTrue(metaClient.getIndexMetadata.isPresent)
+            assertEquals(1, 
metaClient.getIndexMetadata.get.getIndexDefinitions.size())
+
+            // verify functional index records by querying metadata table
+            val metadataSql = s"select 
ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value from hudi_metadata('$tableName') 
where type=3"
+            checkAnswer(metadataSql)(
+              Seq("2020-09-26", "2020-09-26"), // for file in price=10
+              Seq("2021-09-26", "2021-09-26"), // for file in price=100
+              Seq("2022-09-26", "2022-09-26") // for file in price=1000
+            )
+
+            // get file name for price=1000
+            val fileNames = spark.sql(s"select ColumnStatsMetadata.fileName 
from hudi_metadata('$tableName') where type=3 and 
ColumnStatsMetadata.minValue.member6.value='2022-09-26'").collect()
+            assertEquals(1, fileNames.length)
+            val fileName = fileNames(0).getString(0)
+
+            // update the record with id=3
+            // produce two versions so that the older version can be cleaned
+            spark.sql(s"update $tableName set ts=1695706924 where id=3")
+            spark.sql(s"update $tableName set ts=1727329324 where id=3")
+
+            // check cleaning completed
+            val lastCleanInstant = 
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant()
+            assertTrue(lastCleanInstant.isPresent)
+            // verify that file for price=1000 is cleaned
+            assertTrue(HoodieTestUtils.getCleanedFiles(metaClient, 
lastCleanInstant.get()).get(0).getValue.equals(fileName))
+
+            // verify there are new updates to functional index with isDeleted 
true for cleaned file
+            checkAnswer(s"select ColumnStatsMetadata.minValue.member6.value, 
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from 
hudi_metadata('$tableName') where type=3 and 
ColumnStatsMetadata.fileName='$fileName'")(
+              Seq("2022-09-26", "2022-09-26", false),
+              Seq(null, null, true) // for the cleaned file
+            )
+          }
+        }
+      }
+    }
+  }
+
   private def assertTableIdentifier(catalogTable: CatalogTable,
                                     expectedDatabaseName: String,
                                     expectedTableName: String): Unit = {

Reply via email to