This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ff6b70f545800b431a52dff23f490f3034ce7484 Author: Wechar Yu <[email protected]> AuthorDate: Wed Aug 23 08:56:53 2023 +0800 [HUDI-6729] Fix get partition values from path for non-string type partition column (#9484) * reuse HoodieSparkUtils#parsePartitionColumnValues to support multi spark versions * assert parsed partition values from path * throw exception instead of return empty InternalRow when encounter exception in HoodieBaseRelation#getPartitionColumnsAsInternalRowInternal --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 51 ++++++++++----------- .../TestGetPartitionValuesFromPath.scala | 53 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 28 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 0f7eb27fd04..9ace93ed495 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -32,8 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig, Seri import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord} -import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils} -import org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, validateTimestampAsOf, handleHollowCommitIfNeeded} +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils.isNullOrEmpty @@ -41,6 +41,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{ConfigUtils, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.CachingPath import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -54,6 +55,7 @@ import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -62,7 +64,6 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SparkSession} -import org.apache.spark.unsafe.types.UTF8String import java.net.URI import scala.collection.JavaConverters._ @@ -482,32 +483,26 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus, basePath: Path, extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { - try { - val tableConfig = metaClient.getTableConfig - if (extractPartitionValuesFromPartitionPath) { - val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(basePath) - val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent) - val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString - val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean - if (hiveStylePartitioningEnabled) { - val partitionSpec = PartitioningUtils.parsePathFragment(relativePath) - InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString)) - } else { - if (partitionColumns.length == 1) { - InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath))) - } else { - val parts = relativePath.split("/") - assert(parts.size == partitionColumns.length) - InternalRow.fromSeq(parts.map(UTF8String.fromString)) - } - } - } else { - InternalRow.empty + if (extractPartitionValuesFromPartitionPath) { + val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(basePath) + val partitionPathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent) + val relativePath = new URI(tablePathWithoutScheme.toString).relativize(new URI(partitionPathWithoutScheme.toString)).toString + val timeZoneId = conf.get("timeZone", sparkSession.sessionState.conf.sessionLocalTimeZone) + val rowValues = HoodieSparkUtils.parsePartitionColumnValues( + partitionColumns, + relativePath, + basePath, + tableStructSchema, + timeZoneId, + sparkAdapter.getSparkParsePartitionUtil, + conf.getBoolean("spark.sql.sources.validatePartitionColumns", true)) + if(rowValues.length != partitionColumns.length) { + throw new HoodieException("Failed to get partition column values from the partition-path:" + + s"partition column size: ${partitionColumns.length}, parsed partition value size: ${rowValues.length}") } - } catch { - case NonFatal(e) => - logWarning(s"Failed to get the right partition InternalRow for file: ${file.toString}", e) - InternalRow.empty + InternalRow.fromSeq(rowValues) + } else { + InternalRow.empty } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala new file mode 100644 index 00000000000..0b4ce12ae52 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestGetPartitionValuesFromPath extends HoodieSparkSqlTestBase { + + Seq(true, false).foreach { hiveStylePartitioning => + Seq(true, false).foreach {readFromPath => + test(s"Get partition values from path: $readFromPath, isHivePartitioning: $hiveStylePartitioning") { + withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" -> readFromPath.toString) { + withTable(generateTableName) { tableName => + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | region string, + | dt date + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type='mor', + | hoodie.datasource.write.hive_style_partitioning='$hiveStylePartitioning') + |partitioned by (region, dt)""".stripMargin) + spark.sql(s"insert into $tableName partition (region='reg1', dt='2023-08-01') select 1, 'name1'") + + checkAnswer(s"select id, name, region, cast(dt as string) from $tableName")( + Seq(1, "name1", "reg1", "2023-08-01") + ) + } + } + } + } + } +}
