This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 34852b7db99 [HUDI-6941] Fix partition pruning for multiple partition
fields (#9863)
34852b7db99 is described below
commit 34852b7db9901c60aec07af4da742d0e42bfaf06
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Oct 14 15:50:47 2023 +0530
[HUDI-6941] Fix partition pruning for multiple partition fields (#9863)
---
.../scala/org/apache/hudi/HoodieFileIndex.scala | 14 ++-------
.../apache/hudi/SparkHoodieTableFileIndex.scala | 6 ++--
.../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 2 +-
.../org/apache/hudi/TestHoodieFileIndex.scala | 35 ++++++++++++++--------
.../apache/hudi/functional/TestCOWDataSource.scala | 3 +-
5 files changed, 29 insertions(+), 31 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 8a7c06b1d15..60b134a5cd3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -81,7 +81,7 @@ case class HoodieFileIndex(spark: SparkSession,
spark = spark,
metaClient = metaClient,
schemaSpec = schemaSpec,
- configProperties = getConfigProperties(spark, options, metaClient),
+ configProperties = getConfigProperties(spark, options),
queryPaths = HoodieFileIndex.getQueryPaths(options),
specifiedQueryInstant =
options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
@@ -445,7 +445,7 @@ object HoodieFileIndex extends Logging {
schema.fieldNames.filter { colName => refs.exists(r =>
resolver.apply(colName, r.name)) }
}
- def getConfigProperties(spark: SparkSession, options: Map[String, String],
metaClient: HoodieTableMetaClient) = {
+ def getConfigProperties(spark: SparkSession, options: Map[String, String]) =
{
val sqlConf: SQLConf = spark.sessionState.conf
val properties = TypedProperties.fromMap(options.filter(p => p._2 !=
null).asJava)
@@ -463,16 +463,6 @@ object HoodieFileIndex extends Logging {
if (listingModeOverride != null) {
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
listingModeOverride)
}
- val partitionColumns = metaClient.getTableConfig.getPartitionFields
- if (partitionColumns.isPresent) {
- // NOTE: Multiple partition fields could have non-encoded slashes in the
partition value.
- // We might not be able to properly parse partition-values from
the listed partition-paths.
- // Fallback to eager listing in this case.
- if (partitionColumns.get().length > 1
- && (listingModeOverride == null ||
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY.equals(listingModeOverride)))
{
-
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_EAGER)
- }
- }
properties
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index d1b6df6619d..c9a69a5210e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.internal.schema.Types.RecordType
import org.apache.hudi.internal.schema.utils.Conversions
-import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator,
StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{StringPartitionPathFormatter,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
@@ -112,9 +112,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// Note that key generator class name could be null
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if
(classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
- ||
classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
- ||
classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
- ||
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
{
+ ||
classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))
{
val partitionFields = partitionColumns.get().map(column =>
StructField(column, StringType))
StructType(partitionFields)
} else {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 521fb7f3a5f..839b02828d0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -86,7 +86,7 @@ class HoodieCDCRDD(
private val cdcSupplementalLoggingMode =
metaClient.getTableConfig.cdcSupplementalLoggingMode
- private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty,
metaClient)
+ private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
protected val payloadProps: Properties =
Option(metaClient.getTableConfig.getPreCombineField)
.map { preCombineField =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 1ccb4081fb8..a6c9300b7d4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -38,6 +38,7 @@ import
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieSparkClientTestBase
@@ -325,21 +326,29 @@ class TestHoodieFileIndex extends
HoodieSparkClientTestBase with ScalaAssertionS
EqualTo(attribute("dt"), literal("2021/03/01")),
EqualTo(attribute("hh"), literal("10"))
)
- val partitionAndFilesNoPruning =
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
+ // NOTE: That if file-index is in lazy-listing mode and we can't parse
partition values, there's no way
+ // to recover from this since Spark by default have to inject
partition values parsed from the partition paths.
+ if (listingModeOverride ==
DataSourceReadOptions.FILE_INDEX_LISTING_MODE_LAZY) {
+ assertThrows(classOf[HoodieException]) {
+ fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
+ }
+ } else {
+ val partitionAndFilesNoPruning =
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
- assertEquals(1, partitionAndFilesNoPruning.size)
- // The partition prune would not work for this case, so the partition
value it
- // returns is a InternalRow.empty.
- assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
- // The returned file size should equal to the whole file size in all the
partition paths.
- assertEquals(getFileCountInPartitionPaths("2021/03/01/10",
"2021/03/02/10"),
- partitionAndFilesNoPruning.flatMap(_.files).length)
+ assertEquals(1, partitionAndFilesNoPruning.size)
+ // The partition prune would not work for this case, so the partition
value it
+ // returns is a InternalRow.empty.
+ assertTrue(partitionAndFilesNoPruning.forall(_.values.numFields == 0))
+ // The returned file size should equal to the whole file size in all
the partition paths.
+ assertEquals(getFileCountInPartitionPaths("2021/03/01/10",
"2021/03/02/10"),
+ partitionAndFilesNoPruning.flatMap(_.files).length)
- val readDF = spark.read.format("hudi").options(readerOpts).load()
+ val readDF = spark.read.format("hudi").options(readerOpts).load()
- assertEquals(10, readDF.count())
- // There are 5 rows in the dt = 2021/03/01 and hh = 10
- assertEquals(5, readDF.filter("dt = '2021/03/01' and hh ='10'").count())
+ assertEquals(10, readDF.count())
+ // There are 5 rows in the dt = 2021/03/01 and hh = 10
+ assertEquals(5, readDF.filter("dt = '2021/03/01' and hh
='10'").count())
+ }
}
{
@@ -422,7 +431,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase
with ScalaAssertionS
val partitionAndFilesAfterPrune =
fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
assertEquals(1, partitionAndFilesAfterPrune.size)
- assertTrue(fileIndex.areAllPartitionPathsCached())
+ assertEquals(fileIndex.areAllPartitionPathsCached(),
!complexExpressionPushDown)
val PartitionDirectory(partitionActualValues, filesAfterPrune) =
partitionAndFilesAfterPrune.head
val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 68227ba074e..ece1deacd7a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -54,7 +54,7 @@ import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
@@ -1006,6 +1006,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @Disabled("HUDI-6320")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testSparkPartitionByWithCustomKeyGenerator(recordType:
HoodieRecordType): Unit = {