This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2d779fb5aa1 [HUDI-6320] Fix partition parsing in Spark file index for
custom keygen (#9273)
2d779fb5aa1 is described below
commit 2d779fb5aa1ebfd33676ebf29217f25c60e17d12
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Aug 3 09:17:38 2023 +0530
[HUDI-6320] Fix partition parsing in Spark file index for custom keygen
(#9273)
---
.../scala/org/apache/hudi/HoodieFileIndex.scala | 14 ++++-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 13 ++--
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 2 +-
.../org/apache/hudi/TestHoodieFileIndex.scala | 34 ++++-------
.../apache/hudi/functional/TestCOWDataSource.scala | 69 +++++++++++++++++++++-
5 files changed, 99 insertions(+), 33 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 3767b65a8ce..a7e90b2fe50 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -79,7 +79,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
- configProperties = getConfigProperties(spark, options),
+ configProperties = getConfigProperties(spark, options, metaClient),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant =
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -324,7 +324,7 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r =>
resolver.apply(colName, r.name)) }
}
- def getConfigProperties(spark: SparkSession, options: Map[String, String]) =
{
+ def getConfigProperties(spark: SparkSession, options: Map[String, String],
metaClient: HoodieTableMetaClient) = {
val sqlConf: SQLConf = spark.sessionState.conf
val properties = TypedProperties.fromMap(options.filter(p => p._2 !=
null).asJava)
@@ -342,6 +342,16 @@ object HoodieFileIndex extends Logging {
if (listingModeOverride != null) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
+ val partitionColumns = metaClient.getTableConfig.getPartitionFields
+ if (partitionColumns.isPresent) {
+ // NOTE: Multiple partition fields could have non-encoded slashes in the
partition value.
+ // We might not be able to properly parse partition-values from
the listed partition-paths.
+ // Fallback to eager listing in this case.
+ if (partitionColumns.get().length > 1
+ && (listingModeOverride == null ||
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY.equals(listingModeOverride)))
{
+
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER)
+ }
+ }
properties
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 35ef3e9f066..b3d9e5659e8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -29,11 +29,9 @@ import org.apache.hudi.common.model.{FileSlice,
HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
-import org.apache.hudi.hadoop.CachingPath
-import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
-import org.apache.hudi.keygen.{StringPartitionPathFormatter,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator,
StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
@@ -44,7 +42,6 @@ import org.apache.spark.sql.catalyst.{InternalRow,
expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
import java.util.Collections
import javax.annotation.concurrent.NotThreadSafe
@@ -99,7 +96,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
- protected lazy val shouldFastBootstrap =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
+ protected lazy val shouldFastBootstrap: Boolean =
configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)
private lazy val sparkParsePartitionUtil =
sparkAdapter.getSparkParsePartitionUtil
@@ -115,14 +112,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if
(classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
- ||
classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
{
+ ||
classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+ ||
classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+ ||
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
{
val partitionFields = partitionColumns.get().map(column =>
StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().filter(column =>
nameFieldMap.contains(column))
.map(column => nameFieldMap.apply(column))
- if (partitionFields.size != partitionColumns.get().size) {
+ if (partitionFields.length != partitionColumns.get().length) {
val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
if (isBootstrapTable) {
// For bootstrapped tables its possible the schema does not
contain partition field when source table
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 839b02828d0..521fb7f3a5f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -86,7 +86,7 @@ class HoodieCDCRDD(
private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
- private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
+ private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty,
metaClient)
protected val payloadProps: Properties =
Option(metaClient.getTableConfig.getPreCombineField)
.map { preCombineField =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index ba5c2edb2d1..157f4fea854 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -38,7 +38,6 @@ import
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -325,28 +324,21 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
EqualTo(attribute("dt"), literal("2021/03/01")),
EqualTo(attribute("hh"), literal("10"))
)
+ val partitionAndFilesNoPruning =
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
- // NOTE: That if file-index is in lazy-listing mode and we can't parse
partition values, there's no way
- // to recover from this since Spark by default have to inject
partition values parsed from the partition paths.
- if (listingModeOverride ==
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY) {
- assertThrows(classOf[HoodieException]) {
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) }
- } else {
- val partitionAndFilesNoPruning =
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
-
- assertEquals(1, partitionAndFilesNoPruning.size)
- // The partition prune would not work for this case, so the partition
value it
- // returns is a InternalRow.empty.
- assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
- // The returned file size should equal to the whole file size in all
the partition paths.
- assertEquals(getFileCountInPartitionPaths("2021/03/01/10",
"2021/03/02/10"),
- partitionAndFilesNoPruning.flatMap(_.files).length)
+ assertEquals(1, partitionAndFilesNoPruning.size)
+ // The partition prune would not work for this case, so the partition
value it
+ // returns is a InternalRow.empty.
+ assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
+ // The returned file size should equal to the whole file size in all the
partition paths.
+ assertEquals(getFileCountInPartitionPaths("2021/03/01/10",
"2021/03/02/10"),
+ partitionAndFilesNoPruning.flatMap(_.files).length)
- val readDF = spark.read.format("hudi").options(readerOpts).load()
+ val readDF = spark.read.format("hudi").options(readerOpts).load()
- assertEquals(10, readDF.count())
- // There are 5 rows in the dt = 2021/03/01 and hh = 10
- assertEquals(5, readDF.filter("dt = '2021/03/01' and hh
='10'").count())
- }
+ assertEquals(10, readDF.count())
+ // There are 5 rows in the dt = 2021/03/01 and hh = 10
+ assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
}
{
@@ -429,7 +421,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase
with ScalaAssertionS
val partitionAndFilesAfterPrune =
fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
assertEquals(1, partitionAndFilesAfterPrune.size)
- assertEquals(fileIndex.areAllPartitionPathsCached(),
!complexExpressionPushDown)
+ assertTrue(fileIndex.areAllPartitionPathsCached())
val PartitionDirectory(partitionActualValues, filesAfterPrune) =
partitionAndFilesAfterPrune.head
val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bfbf42535d2..ad443ff87a1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -62,6 +62,7 @@ import java.sql.{Date, Timestamp}
import java.util.function.Consumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.util.matching.Regex
/**
@@ -886,8 +887,8 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
@ParameterizedTest
- @EnumSource(value = classOf[HoodieRecordType], names = Array("SPARK"))
- def testSparkPartitionByWithCustomKeyGenerator(recordType:
HoodieRecordType): Unit = {
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
+ def testSparkPartitionByWithCustomKeyGeneratorWithGlobbing(recordType:
HoodieRecordType): Unit = {
val (writeOpts, readOpts) =
getWriterReaderOptsLessPartitionPath(recordType)
// Without fieldType, the default is SIMPLE
@@ -942,6 +943,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
+ def testSparkPartitionByWithCustomKeyGenerator(recordType:
HoodieRecordType): Unit = {
+ val (writeOpts, readOpts) =
getWriterReaderOptsLessPartitionPath(recordType)
+ // Specify fieldType as TIMESTAMP of type EPOCHMILLISECONDS and output
date format as yyyy/MM/dd
+ var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName,
writeOpts)
+ writer.partitionBy("current_ts:TIMESTAMP")
+ .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+ .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ var recordsReadDF = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ val udf_date_format = udf((data: Long) => new
DateTime(data).toString(DateTimeFormat.forPattern("yyyy/MM/dd")))
+
+ assertEquals(0L, recordsReadDF.filter(col("_hoodie_partition_path") =!=
udf_date_format(col("current_ts"))).count())
+
+ // Mixed fieldType with TIMESTAMP of type EPOCHMILLISECONDS and output
date format as yyyy/MM/dd
+ writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName, writeOpts)
+ writer.partitionBy("driver", "rider:SIMPLE", "current_ts:TIMESTAMP")
+ .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+ .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ recordsReadDF = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!=
+ concat(col("driver"), lit("/"), col("rider"), lit("/"),
udf_date_format(col("current_ts")))).count() == 0)
+ }
+
+ @Test
+ def testPartitionPruningForTimestampBasedKeyGenerator(): Unit = {
+ val (writeOpts, readOpts) =
getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO, enableFileIndex =
true)
+ val writer =
getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName, writeOpts)
+ writer.partitionBy("current_ts")
+ .option(TIMESTAMP_TYPE_FIELD.key, "EPOCHMILLISECONDS")
+ .option(TIMESTAMP_OUTPUT_DATE_FORMAT.key, "yyyy/MM/dd")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotQueryRes = spark.read.format("hudi")
+ .options(readOpts)
+ .load(basePath)
+ .where("current_ts > '1970/01/16'")
+
assertTrue(checkPartitionFilters(snapshotQueryRes.queryExecution.executedPlan.toString,
"current_ts.* > 1970/01/16"))
+ }
+
+ def checkPartitionFilters(sparkPlan: String, partitionFilter: String):
Boolean = {
+ val partitionFilterPattern: Regex = """PartitionFilters: \[(.*?)\]""".r
+ val tsPattern: Regex = (partitionFilter).r
+
+ val partitionFilterMatch =
partitionFilterPattern.findFirstMatchIn(sparkPlan)
+
+ partitionFilterMatch match {
+ case Some(m) =>
+ val filters = m.group(1)
+ tsPattern.findFirstIn(filters).isDefined
+ case None =>
+ false
+ }
+ }
+
@Test
def testSparkPartitionByWithSimpleKeyGenerator() {
val (writeOpts, readOpts) =
getWriterReaderOptsLessPartitionPath(HoodieRecordType.AVRO)