This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 62af6583e96 [HUDI-8326] Add some more functional index tests (#12153)
62af6583e96 is described below
commit 62af6583e96e91ffeee1b5e25ff890b90e0a4feb
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Nov 5 07:30:07 2024 +0530
[HUDI-8326] Add some more functional index tests (#12153)
* bas emethod
* [HUDI-8326] Add some more functional index tests
* fix hive sync test for func index
---
.../org/apache/hudi/FunctionalIndexSupport.scala | 10 +-
.../hudi/command/index/TestFunctionalIndex.scala | 221 ++++++++++++++++++++-
2 files changed, 216 insertions(+), 15 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
index 5f66782f733..8d7ee0bf342 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala
@@ -100,13 +100,11 @@ class FunctionalIndexSupport(spark: SparkSession,
checkState(functionToColumnNames.size == 1, "Currently, only one
function with functional index in the query is supported")
val (indexFunction, targetColumnName) = functionToColumnNames.head
val indexDefinitions =
metaClient.getIndexMetadata.get().getIndexDefinitions
- indexDefinitions.asScala.foreach {
- case (indexPartition, indexDefinition) =>
- if (indexDefinition.getIndexFunction.equals(indexFunction) &&
indexDefinition.getSourceFields.contains(targetColumnName)) {
- Option.apply(indexPartition)
- }
+ indexDefinitions.asScala.collectFirst {
+ case (indexPartition, indexDefinition)
+ if indexDefinition.getIndexFunction.equals(indexFunction) &&
indexDefinition.getSourceFields.contains(targetColumnName) =>
+ indexPartition
}
- Option.empty
} else {
Option.empty
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index c4b76a6de88..7a25a442f95 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -19,26 +19,34 @@
package org.apache.spark.sql.hudi.command.index
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex,
HoodieSparkUtils}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.Option
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieWriteConfig}
import org.apache.hudi.hive.HiveSyncConfigHolder._
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
-import org.apache.hudi.metadata.MetadataPartitionType
+import org.apache.hudi.metadata.{HoodieMetadataFileSystemView,
MetadataPartitionType}
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.hudi.util.JFunction
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression,
FromUnixTime, Literal}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hudi.command.{CreateIndexCommand,
ShowIndexesCommand}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.scalatest.Ignore
+import scala.collection.JavaConverters
+
@Ignore
class TestFunctionalIndex extends HoodieSparkSqlTestBase {
@@ -100,11 +108,13 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
tool.syncHoodieTable()
// assert table created and no partition metadata
- val hiveClient = new
HoodieHiveSyncClient(HiveTestUtil.getHiveSyncConfig, metaClient)
- assertTrue(hiveClient.tableExists("h0_ro"))
- assertTrue(hiveClient.tableExists("h0_rt"))
- assertEquals(0, hiveClient.getAllPartitions("h0_ro").size())
- assertEquals(0, hiveClient.getAllPartitions("h0_rt").size())
+ val hiveClient = new
HoodieHiveSyncClient(HiveTestUtil.getHiveSyncConfig,
HoodieTableMetaClient.reload(metaClient))
+ val roTable = tableName + "_ro"
+ val rtTable = tableName + "_rt"
+ assertTrue(hiveClient.tableExists(roTable))
+ assertTrue(hiveClient.tableExists(rtTable))
+ assertEquals(0, hiveClient.getAllPartitions(roTable).size())
+ assertEquals(0, hiveClient.getAllPartitions(rtTable).size())
// check query result
checkAnswer(s"select id, name from $tableName where
from_unixtime(ts, 'yyyy-MM-dd') = '1970-01-01'")(
@@ -648,4 +658,197 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
assertResult(Some(expectedDatabaseName))(catalogTable.identifier.database)
assertResult(expectedTableName)(catalogTable.identifier.table)
}
+
+ test("Test Functional Index Insert after Initialization") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val isPartitioned = true
+ val tableName = generateTableName + s"_init_$tableType$isPartitioned"
+ val partitionByClause = if (isPartitioned) "partitioned by(price)"
else ""
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | $partitionByClause
+ | location '$basePath'
+ """.stripMargin)
+
+ writeRecordsAndValidateFunctionalIndex(tableName, basePath,
"update", isDelete = false, shouldCompact = false, shouldCluster = false,
shouldRollback = false)
+ }
+ }
+ }
+ }
+
+ test("Test Functional Index Rollback") {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val isPartitioned = true
+ val tableName = generateTableName +
s"_rollback_$tableType$isPartitioned"
+ val partitionByClause = if (isPartitioned) "partitioned by(price)"
else ""
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | price int
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | $partitionByClause
+ | location '$basePath'
+ """.stripMargin)
+
+ writeRecordsAndValidateFunctionalIndex(tableName, basePath,
"update", isDelete = false, shouldCompact = false, shouldCluster = false,
shouldRollback = true)
+ }
+ }
+ }
+ }
+
+ /**
+ * Write records to the table with the given operation type and do updates
or deletes, and then validate functional index.
+ */
+ private def writeRecordsAndValidateFunctionalIndex(tableName: String,
+ basePath: String,
+ operationType: String,
+ isDelete: Boolean,
+ shouldCompact: Boolean,
+ shouldCluster: Boolean,
+ shouldRollback: Boolean,
+ shouldValidate: Boolean =
true): Unit = {
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2020-09-26
+ spark.sql(s"insert into $tableName values(1, 'a1', 1601098924, 10)")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2021-09-26
+ spark.sql(s"insert into $tableName values(2, 'a2', 1632634924, 100)")
+ // a record with from_unixtime(ts, 'yyyy-MM-dd') = 2022-09-26
+ spark.sql(s"insert into $tableName values(3, 'a3', 1664170924, 1000)")
+ // create functional index
+ spark.sql(s"create index idx_datestr on $tableName using column_stats(ts)
options(func='from_unixtime', format='yyyy-MM-dd')")
+ val metaClient = createMetaClient(spark, basePath)
+ // verify file pruning with filter on from_unixtime(ts, 'yyyy-MM-dd') =
2020-09-26
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+ val dataFilter = {
+ val tsColumn = UnresolvedAttribute("ts")
+
+ // Define the format "yyyy-MM-dd" as a literal
+ val format = Literal("yyyy-MM-dd")
+
+ // Create the from_unixtime(ts, 'yyyy-MM-dd') expression
+ val fromUnixTimeExpr = FromUnixTime(tsColumn, format)
+
+ // Define the date to compare against as a literal
+ val targetDate = Literal("2024-03-26")
+
+ // Create the equality expression from_unixtime(ts, 'yyyy-MM-dd') =
'2024-03-26'
+ EqualTo(fromUnixTimeExpr, targetDate)
+ }
+ verifyFilePruning(opts, dataFilter, metaClient)
+
+ // do the operation
+ if (isDelete) {
+ spark.sql(s"delete from $tableName where id=1")
+ } else {
+ spark.sql(s"insert into $tableName values(4, 'a4', 1727329324, 10000)")
+ }
+
+ // validate the functional index
+ val metadataSql = s"select ColumnStatsMetadata.minValue.member6.value,
ColumnStatsMetadata.maxValue.member6.value, ColumnStatsMetadata.isDeleted from
hudi_metadata('$tableName') where type=3"
+ // validate the functional index
+ checkAnswer(metadataSql)(
+ Seq("2020-09-26", "2020-09-26", false),
+ Seq("2021-09-26", "2021-09-26", false),
+ Seq("2022-09-26", "2022-09-26", false),
+ Seq("2024-09-26", "2024-09-26", false)
+ )
+
+ if (shouldRollback) {
+ // rollback the operation
+ val lastCompletedInstant =
metaClient.reloadActiveTimeline().getCommitsTimeline.filterCompletedInstants().lastInstant()
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
getWriteConfig(Map.empty, metaClient.getBasePath.toString))
+ writeClient.rollback(lastCompletedInstant.get().getTimestamp)
+ // validate the functional index
+ checkAnswer(metadataSql)(
+ // the last commit is rolledback so no records for that
+ Seq("2020-09-26", "2020-09-26", false),
+ Seq("2021-09-26", "2021-09-26", false),
+ Seq("2022-09-26", "2022-09-26", false)
+ )
+ }
+ }
+
+ private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression, metaClient: HoodieTableMetaClient, isDataSkippingExpected: Boolean
= false, isNoScanExpected: Boolean = false): Unit = {
+ // with data skipping
+ val commonOpts = opts + ("path" -> metaClient.getBasePath.toString)
+ var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+ try {
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
+ val latestDataFilesCount = getLatestDataFilesCount(metaClient =
metaClient)
+ if (isDataSkippingExpected) {
+ assertTrue(filteredFilesCount < latestDataFilesCount)
+ if (isNoScanExpected) {
+ assertTrue(filteredFilesCount == 0)
+ } else {
+ assertTrue(filteredFilesCount > 0)
+ }
+ } else {
+ assertTrue(filteredFilesCount == latestDataFilesCount)
+ }
+
+ // with no data skipping
+ fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ assertTrue(filesCountWithNoSkipping == latestDataFilesCount)
+ } finally {
+ fileIndex.close()
+ }
+ }
+
+ private def getLatestDataFilesCount(includeLogFiles: Boolean = true,
metaClient: HoodieTableMetaClient) = {
+ var totalLatestDataFiles = 0L
+ val fsView: HoodieMetadataFileSystemView =
getTableFileSystemView(metaClient)
+ try {
+
fsView.getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ } finally {
+ fsView.close()
+ }
+ totalLatestDataFiles
+ }
+
+ private def getTableFileSystemView(metaClient: HoodieTableMetaClient):
HoodieMetadataFileSystemView = {
+ new HoodieMetadataFileSystemView(
+ new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)),
+ metaClient,
+ metaClient.getActiveTimeline,
+
HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build())
+ }
+
+ private def getWriteConfig(hudiOpts: Map[String, String], basePath: String):
HoodieWriteConfig = {
+ val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+ HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+ }
}