This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b773fc2d66 [HUDI-8570] Use secondary index only for snapshot queries 
(#12322)
7b773fc2d66 is described below

commit 7b773fc2d66d32b6a4d13ca8adaf231dea186dcd
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Nov 24 22:13:32 2024 +0530

    [HUDI-8570] Use secondary index only for snapshot queries (#12322)
    
    Use secondary index only for snapshot queries. Skip secondary index and
    fallback to regular query path for query types such as time travel and 
incremental.
---
 .../hudi/command/index/TestSecondaryIndex.scala    | 142 +++++++++++++++++----
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  14 ++
 2 files changed, 128 insertions(+), 28 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 99c1a53a009..4ecceee9fdb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -20,18 +20,20 @@
 package org.apache.spark.sql.hudi.command.index
 
 import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, 
INSERT_OPERATION_OPT_VAL, OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, 
RECORDKEY_FIELD, TABLE_TYPE, UPSERT_OPERATION_OPT_VAL}
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
 import 
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
 import org.apache.hudi.metadata.SecondaryIndexKeyUtils
+import org.apache.hudi.storage.StoragePath
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
 import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConverters._
@@ -224,18 +226,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
         val basePath = s"${tmp.getCanonicalPath}/$tableName"
         // Step 1: Initial Insertion of Records
         val dataGen = new HoodieTestDataGenerator()
-        val initialRecords = 
recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
-        val initialDf = 
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
-        val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", 
HoodieWriteConfig.TBL_NAME.key -> tableName)
-        initialDf.write.format("hudi")
-          .options(hudiOpts)
-          .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
-          .mode(SaveMode.Overwrite)
-          .save(basePath)
-
-        // Step 2: Create table and secondary index on 'rider' column
-        spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
-        spark.sql(s"create index idx_rider on $tableName using 
secondary_index(rider)")
+        val hudiOpts: Map[String, String] = 
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)
 
         // Verify initial state of secondary index
         val initialKeys = spark.sql(s"select _row_key from $tableName limit 
5").collect().map(_.getString(0))
@@ -287,7 +278,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
         validateSecondaryIndex(basePath, tableName, updateKeys)
 
         // Step 6: Perform Deletes on Records and Validate Secondary Index
-        val deleteKeys = initialKeys.take(3) // pick a subset of keys to delete
+        val deleteKeys = initialKeys.take(1) // pick a subset of keys to delete
         val deleteDf = 
spark.read.format("hudi").load(basePath).filter(s"_row_key in 
('${deleteKeys.mkString("','")}')")
         deleteDf.write.format("hudi")
           .options(hudiOpts)
@@ -329,18 +320,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
           // Step 1: Initial Insertion of Records
           val dataGen = new HoodieTestDataGenerator()
-          val initialRecords = 
recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
-          val initialDf = 
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
-          val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", 
HoodieWriteConfig.TBL_NAME.key -> tableName)
-          initialDf.write.format("hudi")
-            .options(hudiOpts)
-            .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
-            .mode(SaveMode.Overwrite)
-            .save(basePath)
-
-          // Step 2: Create table and secondary index on 'rider' column
-          spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
-          spark.sql(s"create index idx_rider on $tableName using 
secondary_index(rider)")
+          val hudiOpts: Map[String, String] = 
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)
 
           // Verify initial state of secondary index
           val initialKeys = spark.sql(s"select _row_key from $tableName limit 
5").collect().map(_.getString(0))
@@ -354,7 +334,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
             .options(hudiOpts)
             .option(OPERATION.key, operationType)
             .mode(SaveMode.Append)
-            .save(basePath)) (
+            .save(basePath))(
             "Can not perform operation " + 
WriteOperationType.fromValue(operationType) + " on secondary index")
           // disable secondary index and retry
           df.write.format("hudi")
@@ -363,11 +343,117 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
             .option(OPERATION.key, operationType)
             .mode(SaveMode.Append)
             .save(basePath)
+          dataGen.close()
         }
       }
     }
   }
 
+  test("Test Secondary Index With Time Travel Query") {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        // Step 1: Initial Insertion of Records
+        val dataGen = new HoodieTestDataGenerator()
+        val numInserts = 5
+        val hudiOpts: Map[String, String] = 
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen, 
numInserts)
+
+        // Verify initial state of secondary index
+        val initialKeys = spark.sql(s"select _row_key from $tableName limit 
5").collect().map(_.getString(0))
+        validateSecondaryIndex(basePath, tableName, initialKeys)
+
+        // Step 3: Perform Update Operations on Subset of Records
+        val updateRecords = 
recordsToStrings(dataGen.generateUniqueUpdates(getInstantTime, 1, 
HoodieTestDataGenerator.TRIP_FLATTENED_SCHEMA)).asScala
+        val updateDf = 
spark.read.json(spark.sparkContext.parallelize(updateRecords.toSeq, 1))
+        val updateKeys = 
updateDf.select("_row_key").collect().map(_.getString(0))
+        val recordKeyToUpdate = updateKeys.head
+        val initialSecondaryKey = spark.sql(
+          s"SELECT key FROM hudi_metadata('$basePath') WHERE type=7 AND key 
LIKE '%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR$recordKeyToUpdate'"
+        ).collect().map(indexKey => 
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(indexKey.getString(0))).head
+        // update the record
+        updateDf.write.format("hudi")
+          .options(hudiOpts)
+          .option(OPERATION.key, UPSERT_OPERATION_OPT_VAL)
+          .mode(SaveMode.Append)
+          .save(basePath)
+        // Verify secondary index after updates
+        validateSecondaryIndex(basePath, tableName, updateKeys)
+
+        // Step 4: Perform Time Travel Query
+        // get the first instant on the timeline
+        val metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(basePath)
+          .setConf(HoodieTestUtils.getDefaultStorageConf)
+          .build()
+        val firstInstant = 
metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get()
+        // do a time travel query with data skipping enabled
+        val readOpts = hudiOpts ++ Map(
+          HoodieMetadataConfig.ENABLE.key -> "true",
+          DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+        )
+        val timeTravelDF = spark.read.format("hudi")
+          .options(readOpts)
+          .option("as.of.instant", firstInstant.requestedTime)
+          .load(basePath)
+        assertEquals(numInserts, timeTravelDF.count())
+        // updated record should still show in time travel view
+        assertEquals(1, timeTravelDF.where(s"_row_key = 
'$recordKeyToUpdate'").count())
+        // rider field (secondary key) should point to previous value
+        val secondaryKey = timeTravelDF.where(s"_row_key = 
'$recordKeyToUpdate'").select("rider").collect().head.getString(0)
+        assertEquals(initialSecondaryKey, secondaryKey)
+
+        // Perform Deletes on Records and Validate Secondary Index
+        val deleteDf = 
spark.read.format("hudi").load(basePath).filter(s"_row_key in 
('${updateKeys.mkString("','")}')")
+        // Get fileId for the delete record
+        val deleteFileId = 
deleteDf.select("_hoodie_file_name").collect().head.getString(0)
+        deleteDf.write.format("hudi")
+          .options(hudiOpts)
+          .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+          .mode(SaveMode.Append)
+          .save(basePath)
+        // Verify secondary index for deletes
+        validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys 
= true)
+        // Corrupt the data file that was written for the delete key in the 
first instant
+        val firstCommitMetadata = 
deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get())
+        val partitionToWriteStats = 
firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList)
+        // Find the path for the given fileId
+        val matchingPath: Option[String] = partitionToWriteStats.values.flatten
+          .find(_.getFileId == deleteFileId)
+          .map(_.getPath)
+        assertTrue(matchingPath.isDefined)
+        // Corrupt the data file
+        val dataFile = new StoragePath(basePath, matchingPath.get)
+        val storage = metaClient.getStorage
+        storage.deleteFile(dataFile)
+        storage.createNewFile(dataFile)
+        // Time travel query should now throw an exception
+        checkExceptionContain(() => spark.read.format("hudi")
+          .options(readOpts)
+          .option("as.of.instant", firstInstant.requestedTime)
+          .load(basePath).count())(s"${dataFile.toString} is not a Parquet 
file")
+
+        dataGen.close()
+      }
+    }
+  }
+
+  private def loadInitialBatchAndCreateSecondaryIndex(tableName: String, 
basePath: String, dataGen: HoodieTestDataGenerator, numInserts: Integer = 50) = 
{
+    val initialRecords = 
recordsToStrings(dataGen.generateInserts(getInstantTime, numInserts, 
true)).asScala
+    val initialDf = 
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
+    val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ", 
HoodieWriteConfig.TBL_NAME.key -> tableName)
+    initialDf.write.format("hudi")
+      .options(hudiOpts)
+      .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    // Step 2: Create table and secondary index on 'rider' column
+    spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
+    spark.sql(s"create index idx_rider on $tableName using 
secondary_index(rider)")
+    hudiOpts
+  }
+
   private def validateSecondaryIndex(basePath: String, tableName: String, 
recordKeys: Array[String], hasDeleteKeys: Boolean = false): Unit = {
     // Check secondary index metadata for the selected keys
     recordKeys.foreach { key =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b0b89a17188..1f4d3e42a8d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -166,6 +166,20 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     assertResult(true)(hasException)
   }
 
+  protected def checkExceptionContain(runnable: Runnable)(errorMsg: String): 
Unit = {
+    var hasException = false
+    try {
+      runnable.run()
+    } catch {
+      case e: Throwable if checkMessageContains(e, errorMsg) || 
checkMessageContains(getRootCause(e), errorMsg) =>
+        hasException = true
+
+      case f: Throwable =>
+        fail("Exception should contain: " + errorMsg + ", error message: " + 
f.getMessage, f)
+    }
+    assertResult(true)(hasException)
+  }
+
   protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = {
     var hasException = false
     try {

Reply via email to