This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d1f83de4bcb [HUDI-6729] Fix get partition values from path for 
non-string type partition column (#9484)
d1f83de4bcb is described below

commit d1f83de4bcb33741e3140677796b98254bfd9da1
Author: Wechar Yu <[email protected]>
AuthorDate: Wed Aug 23 08:56:53 2023 +0800

    [HUDI-6729] Fix get partition values from path for non-string type 
partition column (#9484)
    
    * reuse HoodieSparkUtils#parsePartitionColumnValues to support multi spark 
versions
    * assert parsed partition values from path
    * throw exception instead of return empty InternalRow when encounter 
exception in HoodieBaseRelation#getPartitionColumnsAsInternalRowInternal
---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 51 ++++++++++-----------
 .../TestGetPartitionValuesFromPath.scala           | 53 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 28 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 0f7eb27fd04..9ace93ed495 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -32,8 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty, 
HoodieMetadataConfig, Seri
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
-import 
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling, 
validateTimestampAsOf, handleHollowCommitIfNeeded}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.CachingPath
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -54,6 +55,7 @@ import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
@@ -62,7 +64,6 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext, SparkSession}
-import org.apache.spark.unsafe.types.UTF8String
 
 import java.net.URI
 import scala.collection.JavaConverters._
@@ -482,32 +483,26 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus, 
basePath: Path,
                                                          
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
-    try {
-      val tableConfig = metaClient.getTableConfig
-      if (extractPartitionValuesFromPartitionPath) {
-        val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
-        val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
-        val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
-        val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
-        if (hiveStylePartitioningEnabled) {
-          val partitionSpec = PartitioningUtils.parsePathFragment(relativePath)
-          
InternalRow.fromSeq(partitionColumns.map(partitionSpec(_)).map(UTF8String.fromString))
-        } else {
-          if (partitionColumns.length == 1) {
-            InternalRow.fromSeq(Seq(UTF8String.fromString(relativePath)))
-          } else {
-            val parts = relativePath.split("/")
-            assert(parts.size == partitionColumns.length)
-            InternalRow.fromSeq(parts.map(UTF8String.fromString))
-          }
-        }
-      } else {
-        InternalRow.empty
+    if (extractPartitionValuesFromPartitionPath) {
+      val tablePathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(basePath)
+      val partitionPathWithoutScheme = 
CachingPath.getPathWithoutSchemeAndAuthority(file.getPath.getParent)
+      val relativePath = new 
URI(tablePathWithoutScheme.toString).relativize(new 
URI(partitionPathWithoutScheme.toString)).toString
+      val timeZoneId = conf.get("timeZone", 
sparkSession.sessionState.conf.sessionLocalTimeZone)
+      val rowValues = HoodieSparkUtils.parsePartitionColumnValues(
+        partitionColumns,
+        relativePath,
+        basePath,
+        tableStructSchema,
+        timeZoneId,
+        sparkAdapter.getSparkParsePartitionUtil,
+        conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
+      if(rowValues.length != partitionColumns.length) {
+        throw new HoodieException("Failed to get partition column values from 
the partition-path:"
+            + s"partition column size: ${partitionColumns.length}, parsed 
partition value size: ${rowValues.length}")
       }
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to get the right partition InternalRow for file: 
${file.toString}", e)
-        InternalRow.empty
+      InternalRow.fromSeq(rowValues)
+    } else {
+      InternalRow.empty
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
new file mode 100644
index 00000000000..0b4ce12ae52
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGetPartitionValuesFromPath.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestGetPartitionValuesFromPath extends HoodieSparkSqlTestBase {
+
+  Seq(true, false).foreach { hiveStylePartitioning =>
+    Seq(true, false).foreach {readFromPath =>
+      test(s"Get partition values from path: $readFromPath, 
isHivePartitioning: $hiveStylePartitioning") {
+        
withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" -> 
readFromPath.toString) {
+          withTable(generateTableName) { tableName =>
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 | id int,
+                 | name string,
+                 | region string,
+                 | dt date
+                 |) using hudi
+                 |tblproperties (
+                 | primaryKey = 'id',
+                 | type='mor',
+                 | 
hoodie.datasource.write.hive_style_partitioning='$hiveStylePartitioning')
+                 |partitioned by (region, dt)""".stripMargin)
+            spark.sql(s"insert into $tableName partition (region='reg1', 
dt='2023-08-01') select 1, 'name1'")
+
+            checkAnswer(s"select id, name, region, cast(dt as string) from 
$tableName")(
+              Seq(1, "name1", "reg1", "2023-08-01")
+            )
+          }
+        }
+      }
+    }
+  }
+}

Reply via email to