This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d1f83de4bcb [HUDI-6729] Fix get partition values from path for
non-string type partition column (#9484)
d1f83de4bcb is described below
commit d1f83de4bcb33741e3140677796b98254bfd9da1
Author: Wechar Yu <[email protected]>
AuthorDate: Wed Aug 23 08:56:53 2023 +0800
[HUDI-6729] Fix get partition values from path for non-string type
partition column (#9484)
* reuse HoodieSparkUtils#parsePartitionColumnValues to support multi spark
versions
* assert parsed partition values from path
* throw exception instead of return empty InternalRow when encounter
exception in HoodieBaseRelation#getPartitionColumnsAsInternalRowInternal
---
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 51 ++++++++++-----------
.../TestGetPartitionValuesFromPath.scala | 53 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 28 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 0f7eb27fd04..9ace93ed495 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -32,8 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty,
HoodieMetadataConfig, Seri
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
-import
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling,
validateTimestampAsOf, handleHollowCommitIfNeeded}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -54,6 +55,7 @@ import
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression,
SubqueryExpression}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
@@ -62,7 +64,6 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
-import org.apache.spark.unsafe.types.UTF8String
import java.net.URI
import scala.collection.JavaConverters._
@@ -482,32 +483,26 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus,
basePath: Path,
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
- try {
- val tableConfig = metaClient.getTableConfig
- if (extractPartitionValuesFromPartitionPath) {
- val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
- val partitionPathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
- val relativePath = new
URI(tablePathWithoutScheme.toString).relativize(new
URI(partitionPathWithoutScheme.toString)).toString
- val hiveStylePartitioningEnabled =
tableConfig.getHiveStylePartitioningEnable.toBoolean
- if (hiveStylePartitioningEnabled) {
- val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
-
InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString))
- } else {
- if (partitionColumns.length == 1) {
- InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath)))
- } else {
- val parts = relativePath.split("/")
- assert(parts.size == partitionColumns.length)
- InternalRow.fromSeq(parts.map(UTF8String.fromString))
- }
- }
- } else {
- InternalRow.empty
+ if (extractPartitionValuesFromPartitionPath) {
+ val tablePathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
+ val partitionPathWithoutScheme =
CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
+ val relativePath = new
URI(tablePathWithoutScheme.toString).relativize(new
URI(partitionPathWithoutScheme.toString)).toString
+ val timeZoneId = conf.get("timeZone",
sparkSession.sessionState.conf.sessionLocalTimeZone)
+ val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
+ partitionColumns,
+ relativePath,
+ basePath,
+ tableStructSchema,
+ timeZoneId,
+ sparkAdapter.getSparkParsePartitionUtil,
+ conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
+ if(rowValues.length != partitionColumns.length) {
+ throw new HoodieException("Failed to get partition column values from
the partition-path:"
+ + s"partition column size: ${partitionColumns.length}, parsed
partition value size: ${rowValues.length}")
}
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to get the right partition InternalRow for file:
${file.toString}", e)
- InternalRow.empty
+ InternalRow.fromSeq(rowValues)
+ } else {
+ InternalRow.empty
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
new file mode 100644
index 00000000000..0b4ce12ae52
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestGetPartitionValuesFromPath extends HoodieSparkSqlTestBase {
+
+ Seq(true, false).foreach { hiveStylePartitioning =>
+ Seq(true, false).foreach {readFromPath =>
+ test(s"Get partition values from path: $readFromPath,
isHivePartitioning: $hiveStylePartitioning") {
+
withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" ->
readFromPath.toString) {
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | region string,
+ | dt date
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type='mor',
+ |
hoodie.datasource.write.hive_style_partitioning='$hiveStylePartitioning')
+ |partitioned by (region, dt)""".stripMargin)
+ spark.sql(s"insert into $tableName partition (region='reg1',
dt='2023-08-01') select 1, 'name1'")
+
+ checkAnswer(s"select id, name, region, cast(dt as string) from
$tableName")(
+ Seq(1, "name1", "reg1", "2023-08-01")
+ )
+ }
+ }
+ }
+ }
+ }
+}