This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 976840e8eb [HUDI-3812] Fixing Data Skipping configuration to respect
Metadata Table configs (#5244)
976840e8eb is described below
commit 976840e8eb5fee9bfb2ed029e65aec5b7033faf2
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sun Apr 10 10:43:47 2022 -0700
[HUDI-3812] Fixing Data Skipping configuration to respect Metadata Table
configs (#5244)
Addressing the problem of Data Skipping not respecting Metadata Table
configs which might differ b/w write/read paths. More details could be found in
HUDI-3812.
- Fixing Data Skipping configuration to respect MT configs (on the Read
path)
- Tightening up DS handling of cases when no top-level columns are in the
target query
- Enhancing tests to cover all possible case
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 5 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 47 +++++++++----
.../org/apache/hudi/TestHoodieFileIndex.scala | 81 ++++++++++++++--------
.../hudi/functional/TestColumnStatsIndex.scala | 67 +++++++++---------
4 files changed, 122 insertions(+), 78 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 7439323412..4ee5a6005e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -68,7 +68,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
if (targetColumns.nonEmpty) {
readColumnStatsIndexForColumnsInternal(spark, targetColumns,
metadataConfig, tableBasePath)
} else {
- readFullColumnStatsIndexInternal(spark, tableBasePath)
+ readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
}
}
@@ -181,10 +181,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport
{
spark.createDataFrame(transposedRDD, indexSchema)
}
- private def readFullColumnStatsIndexInternal(spark: SparkSession,
tableBasePath: String) = {
+ private def readFullColumnStatsIndexInternal(spark: SparkSession,
metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
val metadataTablePath =
HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
spark.read.format("org.apache.hudi")
+ .options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 0ea4d1cef2..08d0d722b2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -85,11 +85,6 @@ case class HoodieFileIndex(spark: SparkSession,
override def rootPaths: Seq[Path] = queryPaths.asScala
- def isDataSkippingEnabled: Boolean = {
- options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
-
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"false")).toBoolean
- }
-
/**
* Returns the FileStatus for all the base files (excluding log files). This
should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex
or for read optimized query logic
@@ -196,12 +191,20 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters:
Seq[Expression]): Try[Option[Set[String]]] = Try {
- if (!isDataSkippingEnabled || queryFilters.isEmpty ||
!HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
- .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
+ // NOTE: Data Skipping is only effective when it references columns that
are indexed w/in
+ // the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
+ // - Expressions on top-level column's fields (ie, for ex filters
like "struct.field > 0", since
+ // CSI only contains stats for top-level columns, in this case
for "struct")
+ // - Any expression not directly referencing top-level column
(for ex, sub-queries, since there's
+ // nothing CSI in particular could be applied for)
+ lazy val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
+
+ if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable ||
!isDataSkippingEnabled) {
+ validateConfig()
+ Option.empty
+ } else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
- val queryReferencedColumns = collectReferencedColumns(spark,
queryFilters, schema)
-
val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath,
metadataConfig, queryReferencedColumns)
// Persist DF to avoid re-computing column statistics unraveling
@@ -245,13 +248,27 @@ case class HoodieFileIndex(spark: SparkSession,
override def refresh(): Unit = super.refresh()
- override def inputFiles: Array[String] = {
- val fileStatusList = allFiles
- fileStatusList.map(_.getPath.toString).toArray
- }
+ override def inputFiles: Array[String] =
+ allFiles.map(_.getPath.toString).toArray
- override def sizeInBytes: Long = {
- cachedFileSize
+ override def sizeInBytes: Long = cachedFileSize
+
+ private def isColumnStatsIndexAvailable =
+
HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
+ .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+
+ private def isDataSkippingEnabled: Boolean =
+ options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
+
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
"false")).toBoolean
+
+ private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
+ private def isColumnStatsIndexEnabled: Boolean =
metadataConfig.isColumnStatsIndexEnabled
+
+ private def validateConfig(): Unit = {
+ if (isDataSkippingEnabled && (!isMetadataTableEnabled ||
!isColumnStatsIndexEnabled)) {
+ logWarning("Data skipping requires both Metadata Table and Column Stats
Index to be enabled as well! " +
+ s"(isMetadataTableEnabled = ${isMetadataTableEnabled},
isColumnStatsIndexEnabled = ${isColumnStatsIndexEnabled}")
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index feed6fd334..1d4dbfb1ea 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -25,10 +25,9 @@ import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.common.HoodieJavaEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.EngineType
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType,
HoodieTableType}
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
@@ -38,17 +37,15 @@ import org.apache.hudi.config.{HoodieStorageConfig,
HoodieWriteConfig}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
-import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType}
-import org.apache.hudi.testutils.{HoodieClientTestBase,
SparkClientFunctionalTestHarness}
+import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.{NoopCache,
PartitionDirectory}
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{BeforeEach, Tag, Test}
+import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource,
ValueSource}
@@ -343,16 +340,19 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
import _spark.implicits._
val inputDF = tuples.toDF("id", "inv_id", "str", "rand")
+ val writeMetadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "id",
PRECOMBINE_FIELD.key -> "id",
- HoodieMetadataConfig.ENABLE.key -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
- )
+ ) ++ writeMetadataOpts
// If there are any failures in the Data Skipping flow, test should fail
spark.sqlContext.setConf(DataSkippingFailureMode.configName,
DataSkippingFailureMode.Strict.value);
@@ -368,26 +368,46 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient)
- val props = Map[String, String](
- "path" -> basePath,
- QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
- DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
- // NOTE: Metadata Table has to be enabled on the read path as well
- HoodieMetadataConfig.ENABLE.key -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
- )
-
- val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props,
NoopCache)
-
- val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
- assertEquals(10, allFilesPartitions.head.files.length)
-
- // We're selecting a single file that contains "id" == 1 row, which there
should be
- // strictly 1. Given that 1 is minimal possible value, Data Skipping
should be able to
- // truncate search space to just a single file
- val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable =
false)(), Literal(1))
- val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
- assertEquals(1, filteredPartitions.head.files.length)
+ case class TestCase(enableMetadata: Boolean,
+ enableColumnStats: Boolean,
+ enableDataSkipping: Boolean)
+
+ val testCases: Seq[TestCase] =
+ TestCase(enableMetadata = false, enableColumnStats = false,
enableDataSkipping = false) ::
+ TestCase(enableMetadata = false, enableColumnStats = false,
enableDataSkipping = true) ::
+ TestCase(enableMetadata = true, enableColumnStats = false,
enableDataSkipping = true) ::
+ TestCase(enableMetadata = false, enableColumnStats = true,
enableDataSkipping = true) ::
+ TestCase(enableMetadata = true, enableColumnStats = true,
enableDataSkipping = true) ::
+ Nil
+
+ for (testCase <- testCases) {
+ val readMetadataOpts = Map(
+ // NOTE: Metadata Table has to be enabled on the read path as well
+ HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString,
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key ->
testCase.enableColumnStats.toString,
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ )
+
+ val props = Map[String, String](
+ "path" -> basePath,
+ QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
testCase.enableDataSkipping.toString
+ ) ++ readMetadataOpts
+
+ val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props,
NoopCache)
+
+ val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
+ assertEquals(10, allFilesPartitions.head.files.length)
+
+ if (testCase.enableDataSkipping && testCase.enableMetadata) {
+ // We're selecting a single file that contains "id" == 1 row, which
there should be
+ // strictly 1. Given that 1 is minimal possible value, Data Skipping
should be able to
+ // truncate search space to just a single file
+ val dataFilter = EqualTo(AttributeReference("id", IntegerType,
nullable = false)(), Literal(1))
+ val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
+ assertEquals(1, filteredPartitions.head.files.length)
+ }
+ }
}
private def attribute(partition: String): AttributeReference = {
@@ -411,6 +431,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
}
object TestHoodieFileIndex {
+
def keyGeneratorParameters(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Arguments.arguments(null.asInstanceOf[String]),
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index e3cde53951..841041e40c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
@@ -35,7 +36,7 @@ import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -72,19 +73,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
+ @MethodSource(Array("testMetadataColumnStatsIndexParams"))
+ def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = {
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
+ )
+
val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
- HoodieMetadataConfig.ENABLE.key -> "true",
- HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
- HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key ->
forceFullLogScan.toString,
+ // NOTE: Currently only this setting is used like following by different
MT partitions:
+ // - Files: using it
+ // - Column Stats: NOT using it (defaults to doing
"point-lookups")
+ HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key ->
testCase.forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
- )
+ ) ++ metadataOpts
setTableName("hoodie_test")
initMetaClient()
@@ -108,10 +115,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
val metadataConfig = HoodieMetadataConfig.newBuilder()
- .fromProperties(toProperties(opts))
+ .fromProperties(toProperties(metadataOpts))
.build()
- val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig,
sourceTableSchema.fieldNames)
+ val targetColumnsToRead: Seq[String] = {
+ // Providing empty seq of columns to [[readColumnStatsIndex]] will lead
to the whole
+ // MT to be read, and subsequently filtered
+ if (testCase.readFullMetadataTable) Seq.empty
+ else sourceTableSchema.fieldNames
+ }
+
+ val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig,
targetColumnsToRead)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF,
sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsSchema =
composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
@@ -151,7 +165,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
- val updatedColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, sourceTableSchema.fieldNames)
+ val updatedColStatsDF = readColumnStatsIndex(spark, basePath,
metadataConfig, targetColumnsToRead)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark,
updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
@@ -243,26 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
)
}
- def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String,
targetParquetTablePath: String): Unit = {
- val jsonInputDF =
- // NOTE: Schema here is provided for validation that the input date is in
the appropriate format
- spark.read
- .schema(sourceTableSchema)
- .json(sourceJSONTablePath)
-
- jsonInputDF
- .sort("c1")
- .repartition(4, new Column("c1"))
- .write
- .format("parquet")
- .mode("overwrite")
- .save(targetParquetTablePath)
-
- val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
- // Have to cleanup additional artefacts of Spark write
- fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
- }
-
private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
val sourceTableSchema =
new StructType()
@@ -316,3 +310,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase
with ColumnStatsIndexSup
}
}
+
+object TestColumnStatsIndex {
+
+ case class ColumnStatsTestCase(forceFullLogScan: Boolean,
readFullMetadataTable: Boolean)
+
+ def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
+ java.util.stream.Stream.of(
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false,
readFullMetadataTable = false)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true,
readFullMetadataTable = true))
+ )
+}