This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7b773fc2d66 [HUDI-8570] Use secondary index only for snapshot queries
(#12322)
7b773fc2d66 is described below
commit 7b773fc2d66d32b6a4d13ca8adaf231dea186dcd
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Nov 24 22:13:32 2024 +0530
[HUDI-8570] Use secondary index only for snapshot queries (#12322)
Use secondary index only for snapshot queries. Skip secondary index and
fallback to regular query path for query types such as time travel and
incremental.
---
.../hudi/command/index/TestSecondaryIndex.scala | 142 +++++++++++++++++----
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 14 ++
2 files changed, 128 insertions(+), 28 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 99c1a53a009..4ecceee9fdb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -20,18 +20,20 @@
package org.apache.spark.sql.hudi.command.index
import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL,
INSERT_OPERATION_OPT_VAL, OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE, UPSERT_OPERATION_OPT_VAL}
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.HoodieTableMetaClient
+import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeCommitMetadata
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieWriteConfig}
import
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
import org.apache.hudi.metadata.SecondaryIndexKeyUtils
+import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
@@ -224,18 +226,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// Step 1: Initial Insertion of Records
val dataGen = new HoodieTestDataGenerator()
- val initialRecords =
recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
- val initialDf =
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
- val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ",
HoodieWriteConfig.TBL_NAME.key -> tableName)
- initialDf.write.format("hudi")
- .options(hudiOpts)
- .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- // Step 2: Create table and secondary index on 'rider' column
- spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
- spark.sql(s"create index idx_rider on $tableName using
secondary_index(rider)")
+ val hudiOpts: Map[String, String] =
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)
// Verify initial state of secondary index
val initialKeys = spark.sql(s"select _row_key from $tableName limit
5").collect().map(_.getString(0))
@@ -287,7 +278,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
validateSecondaryIndex(basePath, tableName, updateKeys)
// Step 6: Perform Deletes on Records and Validate Secondary Index
- val deleteKeys = initialKeys.take(3) // pick a subset of keys to delete
+ val deleteKeys = initialKeys.take(1) // pick a subset of keys to delete
val deleteDf =
spark.read.format("hudi").load(basePath).filter(s"_row_key in
('${deleteKeys.mkString("','")}')")
deleteDf.write.format("hudi")
.options(hudiOpts)
@@ -329,18 +320,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
val basePath = s"${tmp.getCanonicalPath}/$tableName"
// Step 1: Initial Insertion of Records
val dataGen = new HoodieTestDataGenerator()
- val initialRecords =
recordsToStrings(dataGen.generateInserts(getInstantTime, 50, true)).asScala
- val initialDf =
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
- val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ",
HoodieWriteConfig.TBL_NAME.key -> tableName)
- initialDf.write.format("hudi")
- .options(hudiOpts)
- .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- // Step 2: Create table and secondary index on 'rider' column
- spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
- spark.sql(s"create index idx_rider on $tableName using
secondary_index(rider)")
+ val hudiOpts: Map[String, String] =
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen)
// Verify initial state of secondary index
val initialKeys = spark.sql(s"select _row_key from $tableName limit
5").collect().map(_.getString(0))
@@ -354,7 +334,7 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.options(hudiOpts)
.option(OPERATION.key, operationType)
.mode(SaveMode.Append)
- .save(basePath)) (
+ .save(basePath))(
"Can not perform operation " +
WriteOperationType.fromValue(operationType) + " on secondary index")
// disable secondary index and retry
df.write.format("hudi")
@@ -363,11 +343,117 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.option(OPERATION.key, operationType)
.mode(SaveMode.Append)
.save(basePath)
+ dataGen.close()
}
}
}
}
+ test("Test Secondary Index With Time Travel Query") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Step 1: Initial Insertion of Records
+ val dataGen = new HoodieTestDataGenerator()
+ val numInserts = 5
+ val hudiOpts: Map[String, String] =
loadInitialBatchAndCreateSecondaryIndex(tableName, basePath, dataGen,
numInserts)
+
+ // Verify initial state of secondary index
+ val initialKeys = spark.sql(s"select _row_key from $tableName limit
5").collect().map(_.getString(0))
+ validateSecondaryIndex(basePath, tableName, initialKeys)
+
+ // Step 3: Perform Update Operations on Subset of Records
+ val updateRecords =
recordsToStrings(dataGen.generateUniqueUpdates(getInstantTime, 1,
HoodieTestDataGenerator.TRIP_FLATTENED_SCHEMA)).asScala
+ val updateDf =
spark.read.json(spark.sparkContext.parallelize(updateRecords.toSeq, 1))
+ val updateKeys =
updateDf.select("_row_key").collect().map(_.getString(0))
+ val recordKeyToUpdate = updateKeys.head
+ val initialSecondaryKey = spark.sql(
+ s"SELECT key FROM hudi_metadata('$basePath') WHERE type=7 AND key
LIKE '%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR$recordKeyToUpdate'"
+ ).collect().map(indexKey =>
SecondaryIndexKeyUtils.getSecondaryKeyFromSecondaryIndexKey(indexKey.getString(0))).head
+ // update the record
+ updateDf.write.format("hudi")
+ .options(hudiOpts)
+ .option(OPERATION.key, UPSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ // Verify secondary index after updates
+ validateSecondaryIndex(basePath, tableName, updateKeys)
+
+ // Step 4: Perform Time Travel Query
+ // get the first instant on the timeline
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ val firstInstant =
metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get()
+ // do a time travel query with data skipping enabled
+ val readOpts = hudiOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ val timeTravelDF = spark.read.format("hudi")
+ .options(readOpts)
+ .option("as.of.instant", firstInstant.requestedTime)
+ .load(basePath)
+ assertEquals(numInserts, timeTravelDF.count())
+ // updated record should still show in time travel view
+ assertEquals(1, timeTravelDF.where(s"_row_key =
'$recordKeyToUpdate'").count())
+ // rider field (secondary key) should point to previous value
+ val secondaryKey = timeTravelDF.where(s"_row_key =
'$recordKeyToUpdate'").select("rider").collect().head.getString(0)
+ assertEquals(initialSecondaryKey, secondaryKey)
+
+ // Perform Deletes on Records and Validate Secondary Index
+ val deleteDf =
spark.read.format("hudi").load(basePath).filter(s"_row_key in
('${updateKeys.mkString("','")}')")
+ // Get fileId for the delete record
+ val deleteFileId =
deleteDf.select("_hoodie_file_name").collect().head.getString(0)
+ deleteDf.write.format("hudi")
+ .options(hudiOpts)
+ .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ // Verify secondary index for deletes
+ validateSecondaryIndex(basePath, tableName, updateKeys, hasDeleteKeys
= true)
+ // Corrupt the data file that was written for the delete key in the
first instant
+ val firstCommitMetadata =
deserializeCommitMetadata(metaClient.reloadActiveTimeline().getInstantDetails(firstInstant).get())
+ val partitionToWriteStats =
firstCommitMetadata.getPartitionToWriteStats.asScala.mapValues(_.asScala.toList)
+ // Find the path for the given fileId
+ val matchingPath: Option[String] = partitionToWriteStats.values.flatten
+ .find(_.getFileId == deleteFileId)
+ .map(_.getPath)
+ assertTrue(matchingPath.isDefined)
+ // Corrupt the data file
+ val dataFile = new StoragePath(basePath, matchingPath.get)
+ val storage = metaClient.getStorage
+ storage.deleteFile(dataFile)
+ storage.createNewFile(dataFile)
+ // Time travel query should now throw an exception
+ checkExceptionContain(() => spark.read.format("hudi")
+ .options(readOpts)
+ .option("as.of.instant", firstInstant.requestedTime)
+ .load(basePath).count())(s"${dataFile.toString} is not a Parquet
file")
+
+ dataGen.close()
+ }
+ }
+ }
+
+ private def loadInitialBatchAndCreateSecondaryIndex(tableName: String,
basePath: String, dataGen: HoodieTestDataGenerator, numInserts: Integer = 50) =
{
+ val initialRecords =
recordsToStrings(dataGen.generateInserts(getInstantTime, numInserts,
true)).asScala
+ val initialDf =
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))
+ val hudiOpts = commonOpts ++ Map(TABLE_TYPE.key -> "MERGE_ON_READ",
HoodieWriteConfig.TBL_NAME.key -> tableName)
+ initialDf.write.format("hudi")
+ .options(hudiOpts)
+ .option(OPERATION.key, INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ // Step 2: Create table and secondary index on 'rider' column
+ spark.sql(s"CREATE TABLE $tableName USING hudi LOCATION '$basePath'")
+ spark.sql(s"create index idx_rider on $tableName using
secondary_index(rider)")
+ hudiOpts
+ }
+
private def validateSecondaryIndex(basePath: String, tableName: String,
recordKeys: Array[String], hasDeleteKeys: Boolean = false): Unit = {
// Check secondary index metadata for the selected keys
recordKeys.foreach { key =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index b0b89a17188..1f4d3e42a8d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -166,6 +166,20 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
assertResult(true)(hasException)
}
+ protected def checkExceptionContain(runnable: Runnable)(errorMsg: String):
Unit = {
+ var hasException = false
+ try {
+ runnable.run()
+ } catch {
+ case e: Throwable if checkMessageContains(e, errorMsg) ||
checkMessageContains(getRootCause(e), errorMsg) =>
+ hasException = true
+
+ case f: Throwable =>
+ fail("Exception should contain: " + errorMsg + ", error message: " +
f.getMessage, f)
+ }
+ assertResult(true)(hasException)
+ }
+
protected def checkExceptionContain(sql: String)(errorMsg: String): Unit = {
var hasException = false
try {