This is an automated email from the ASF dual-hosted git repository.
mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7d046f9 [HUDI-3008] Fixing HoodieFileIndex partition column parsing
for nested fields
new b5890cd Merge pull request #4308 from harsh1231/HUDI-3008
7d046f9 is described below
commit 7d046f914a059b2623d7f2a7627c44b15ccc0ddb
Author: harshal patil <[email protected]>
AuthorDate: Tue Dec 14 17:28:18 2021 +0530
[HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested
fields
---
.../scala/org/apache/hudi/HoodieFileIndex.scala | 25 ++++++++++++++++----
.../org/apache/hudi/TestHoodieFileIndex.scala | 27 ++++++++++++++++++++--
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index f9b68cb..0ed1b48 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,6 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
-
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE,
QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig,
HoodieTableFileSystemView}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
-
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
BoundReference, Expression, InterpretedPredicate}
@@ -37,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex,
FileStatusCache, N
import
org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
@@ -108,7 +106,7 @@ case class HoodieFileIndex(
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
- val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+ val nameFieldMap = generateNameFieldMap(Right(schema))
if (partitionColumns.isPresent) {
val partitionFields = partitionColumns.get().map(column =>
@@ -123,6 +121,25 @@ case class HoodieFileIndex(
}
}
+ /**
+ * This method traverses StructType recursively to build map of columnName
-> StructField
+ * Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] ->
final map will have keys corresponding
+ * only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"]
+ * @param structField
+ * @return map of ( columns names -> StructField )
+ */
+ private def generateNameFieldMap(structField: Either[StructField,
StructType]) : Map[String, StructField] = {
+ structField match {
+ case Right(field) => field.fields.map(f =>
generateNameFieldMap(Left(f))).flatten.toMap
+ case Left(field) => field.dataType match {
+ case struct: StructType => generateNameFieldMap(Right(struct)).map {
+ case (key: String, sf: StructField) => (field.name + "." + key, sf)
+ }
+ case _ => Map(field.name -> field)
+ }
+ }
+ }
+
private lazy val engineContext = new HoodieSparkEngineContext(new
JavaSparkContext(spark.sparkContext))
private lazy val configProperties = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 0c3918b..62f98cf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -18,7 +18,6 @@
package org.apache.hudi
import java.util.Properties
-
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,6 +30,7 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config,
TimestampType}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference,
EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.PartitionDirectory
import org.apache.spark.sql.types.StringType
@@ -38,7 +38,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -253,6 +253,29 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
}
+ @ParameterizedTest
+ @CsvSource(Array("true,a.b.c","false,a.b.c","true,c","false,c"))
+ def testQueryPartitionPathsForNestedPartition(useMetaFileList:Boolean,
partitionBy:String): Unit = {
+ val inputDF = spark.range(100)
+ .withColumn("c",lit("c"))
+ .withColumn("b",struct("c"))
+ .withColumn("a",struct("b"))
+ inputDF.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(RECORDKEY_FIELD.key, "id")
+ .option(PRECOMBINE_FIELD.key, "id")
+ .option(PARTITIONPATH_FIELD.key, partitionBy)
+ .option(HoodieMetadataConfig.ENABLE.key(), useMetaFileList)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val fileIndex = HoodieFileIndex(spark, metaClient, None,
+ queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key ->
useMetaFileList.toString))
+ // test if table is partitioned on nested columns,
getAllQueryPartitionPaths does not break
+
assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c"))
+ }
+
private def attribute(partition: String): AttributeReference = {
AttributeReference(partition, StringType, true)()
}