This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad8f3d4e68a [HUDI-6891] Fix RO queries with RLI and record key
predicate (#11975)
ad8f3d4e68a is described below
commit ad8f3d4e68a605c05e9aef7c443e56ecf4b01892
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Sep 25 17:00:58 2024 +0530
[HUDI-6891] Fix RO queries with RLI and record key predicate (#11975)
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 4 ++
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 8 +--
.../scala/org/apache/hudi/HoodieFileIndex.scala | 2 +-
.../org/apache/hudi/RecordLevelIndexSupport.scala | 27 ++++++-
.../org/apache/hudi/SparkBaseIndexSupport.scala | 9 ++-
.../apache/hudi/functional/TestMORDataSource.scala | 82 +++++++++++++++++++++-
6 files changed, 122 insertions(+), 10 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 5a0fd79fcc4..4688c160d69 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -195,6 +195,10 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
resetTableMetadata(null);
}
+ public HoodieTableQueryType getQueryType() {
+ return queryType;
+ }
+
protected String[] getPartitionColumns() {
return partitionColumns;
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index c3b5228d195..c471649d9a6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -348,10 +348,10 @@ class ColumnStatsIndexSupport(spark: SparkSession,
val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
// NOTE: Explicit conversion is required for Scala 2.11
metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
- toScalaOption(record.getData.getInsertValue(null, null))
- .map(metadataRecord =>
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
- .orNull
- }))
+ toScalaOption(record.getData.getInsertValue(null, null))
+ .map(metadataRecord =>
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+ .orNull
+ }))
.filter(JFunction.toJavaSerializableFunction(columnStatsRecord =>
columnStatsRecord != null))
columnStatsRecords
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 5ea4d460714..0c645c81893 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -400,7 +400,7 @@ case class HoodieFileIndex(spark: SparkSession,
lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
if (isDataSkippingEnabled) {
for(indexSupport: SparkBaseIndexSupport <- indicesSupport) {
- if (indexSupport.isIndexAvailable) {
+ if (indexSupport.isIndexAvailable &&
indexSupport.supportsQueryType(options)) {
val prunedFileNames = indexSupport.computeCandidateIsStrict(spark,
this, queryFilters, queryReferencedColumns,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter)
if (prunedFileNames.nonEmpty) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index f37beef0727..708e0d47a0a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -17,17 +17,20 @@
package org.apache.hudi
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE,
TIME_TRAVEL_AS_OF_INSTANT}
import org.apache.hudi.RecordLevelIndexSupport.getPrunedStoragePaths
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
+import org.apache.hudi.common.model.HoodieTableQueryType.SNAPSHOT
import org.apache.hudi.common.table.HoodieTableMetaClient
+import
org.apache.hudi.common.table.timeline.HoodieTimeline.{GREATER_THAN_OR_EQUALS,
compareTimestamps}
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.storage.StoragePath
-
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, In, Literal}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import scala.collection.JavaConverters._
import scala.collection.{JavaConverters, mutable}
@@ -91,6 +94,28 @@ class RecordLevelIndexSupport(spark: SparkSession,
def isIndexAvailable: Boolean = {
metadataConfig.isEnabled &&
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
}
+
+ /**
+ * Returns true if the query type is supported by the index.
+ */
+ override def supportsQueryType(options: Map[String, String]): Boolean = {
+ if (!options.getOrElse(QUERY_TYPE.key,
QUERY_TYPE.defaultValue).equalsIgnoreCase(SNAPSHOT.name)) {
+ // Disallow RLI for non-snapshot query types
+ false
+ } else {
+ // Now handle the time-travel case for snapshot queries
+ options.get(TIME_TRAVEL_AS_OF_INSTANT.key)
+ .fold {
+ // No time travel instant specified, so allow if it's a snapshot
query
+ true
+ } { instant =>
+ // Check if the as.of.instant is greater than or equal to the last
completed instant.
+ // We can still use RLI for data skipping for the latest snapshot.
+ compareTimestamps(HoodieSqlCommonUtils.formatQueryInstant(instant),
+ GREATER_THAN_OR_EQUALS,
metaClient.getCommitsTimeline.filterCompletedInstants.lastInstant.get.getTimestamp)
+ }
+ }
+ }
}
object RecordLevelIndexSupport {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 2371e4b066e..32c318f1049 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.hudi.util.JFunction
-
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
@@ -45,6 +44,14 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
def isIndexAvailable: Boolean
+ /**
+ * Returns true if the query type is supported by the index.
+ *
+ * TODO: The default implementation should be changed to throw
+ * an exception once time travel support for metadata table is added.
+ */
+ def supportsQueryType(options: Map[String, String]): Boolean = true
+
def computeCandidateIsStrict(spark: SparkSession,
fileIndex: HoodieFileIndex,
queryFilters: Seq[Expression],
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 48fecc6cd95..a78b216cc38 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -35,9 +35,9 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils,
HoodieSparkClientTestBase}
import org.apache.hudi.util.JFunction
-import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils,
DataSourceWriteOptions, HoodieDataSourceHelpers, DefaultSparkRecordMerger,
SparkDatasetMixin}
-
+import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils,
DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieDataSourceHelpers,
SparkDatasetMixin}
import org.apache.hadoop.fs.Path
+import org.apache.hudi.QuickstartUtils.convertToStringList
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -49,7 +49,6 @@ import org.junit.jupiter.params.provider.{CsvSource,
EnumSource, ValueSource}
import org.slf4j.LoggerFactory
import java.util.function.Consumer
-
import scala.collection.JavaConverters._
/**
@@ -1418,4 +1417,81 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
metaClient = createMetaClient(spark, basePath)
assertEquals(metaClient.getTableConfig.getRecordMergerStrategy,
mergerStrategyName)
}
+
+ /**
+ * Test Read-Optimized and time travel query on MOR table with RECORD_INDEX
enabled.
+ */
+ @Test
+ def testReadOptimizedAndTimeTravelWithRecordIndex(): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts()
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+ )
+ readOpts = readOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Create a MOR table and add three records to the table.
+ val records = recordsToStrings(dataGen.generateInserts("000",
3)).asScala.toSeq
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ var roDf = spark.read.format("hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath)
+ // assert count
+ assertEquals(3, roDf.count())
+
+ // choose a record to delete
+ val deleteRecord =
recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 1)).asScala.toSeq
+ // get the record key from the deleted record records2
+ val recordKey =
deleteRecord.head.split(",")(1).split(":")(1).trim.replace("\"", "")
+ // delete the record
+ val inputDF2 =
spark.read.json(spark.sparkContext.parallelize(deleteRecord, 1))
+ inputDF2.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ // load RO view again with data skipping enabled
+ roDf = spark.read.format("hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath)
+
+ // There should still be 3 records in RO view
+ assertEquals(3, roDf.count())
+ // deleted record should still show in RO view
+ assertEquals(1, roDf.where(s"_row_key = '$recordKey'").count())
+
+ // load snapshot view
+ val snapshotDF = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ // There should be only 2 records in snapshot view
+ assertEquals(2, snapshotDF.count())
+ // deleted record should NOT show in snapshot view
+ assertEquals(0, snapshotDF.where(s"_row_key = '$recordKey'").count())
+
+ // get the first instant on the timeline
+ val firstInstant =
metaClient.reloadActiveTimeline().filterCompletedInstants().firstInstant().get()
+ // do a time travel query with data skipping enabled
+ val timeTravelDF = spark.read.format("hudi")
+ .options(readOpts)
+ .option("as.of.instant", firstInstant.getTimestamp)
+ .load(basePath)
+ // there should still be 3 records in time travel view
+ assertEquals(3, timeTravelDF.count())
+ // deleted record should still show in time travel view
+ assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
+ }
}