This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b216d794c0d [HUDI-8455] Logic for partition column values parse is 
localized in `HoodieSparkUtils` (#12192)
b216d794c0d is described below

commit b216d794c0d902c10106704ddacff775c83f64bb
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Nov 4 09:42:45 2024 +0700

    [HUDI-8455] Logic for partition column values parse is localized in 
`HoodieSparkUtils` (#12192)
---
 .../hudi/client/utils/SparkPartitionUtils.java     |  2 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 28 ++++++++++++++++++++++
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  8 +++----
 .../hudi/keygen/constant/KeyGeneratorType.java     | 11 +++++++++
 .../hudi/hadoop/HiveHoodieTableFileIndex.java      |  2 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  1 +
 .../scala/org/apache/hudi/HoodieCDCFileIndex.scala |  2 +-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    | 27 ++++++++-------------
 .../org/apache/hudi/TestHoodieFileIndex.scala      |  6 ++---
 9 files changed, 60 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index e8db1b3515d..f2309db19b2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -40,7 +40,7 @@ public class SparkPartitionUtils {
       return new Object[0];
     }
     SparkParsePartitionUtil sparkParsePartitionUtil = 
SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
-    return HoodieSparkUtils.parsePartitionColumnValues(
+    return HoodieSparkUtils.doParsePartitionColumnValues(
         partitionFields.get(),
         partitionPath,
         new StoragePath(basePath),
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 0afd19745ba..88325617df1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -28,6 +28,9 @@ import org.apache.hudi.util.ExceptionWrappingIterator
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -227,6 +230,31 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
   }
 
   def parsePartitionColumnValues(partitionColumns: Array[String],
+                                   partitionPath: String,
+                                   tableBasePath: StoragePath,
+                                   tableSchema: StructType,
+                                   tableConfig: java.util.Map[String, String],
+                                   timeZoneId: String,
+                                   sparkParsePartitionUtil: 
SparkParsePartitionUtil,
+                                   shouldValidatePartitionColumns: Boolean): 
Array[Object] = {
+    val keyGeneratorClass = 
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
+    val timestampKeyGeneratorType = 
tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+
+    if (null != keyGeneratorClass
+      && null != timestampKeyGeneratorType
+      && keyGeneratorClass.equals(KeyGeneratorType.TIMESTAMP.getClassName)
+      && 
!timestampKeyGeneratorType.matches(TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.toString))
 {
+      // For TIMESTAMP key generator when TYPE is not DATE_STRING (like 
SCALAR, UNIX_TIMESTAMP, EPOCHMILLISECONDS, etc.),
+      // we couldn't reconstruct initial partition column values from 
partition paths due to lost data after formatting.
+      // But the output for these cases is in a string format, so we can pass 
partitionPath as UTF8String
+      Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+    } else {
+      doParsePartitionColumnValues(partitionColumns, partitionPath, 
tableBasePath, tableSchema, timeZoneId,
+        sparkParsePartitionUtil, shouldValidatePartitionColumns)
+    }
+  }
+
+  def doParsePartitionColumnValues(partitionColumns: Array[String],
                                  partitionPath: String,
                                  basePath: StoragePath,
                                  schema: StructType,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 7a6ed714ac9..bbd3b50b50f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -169,7 +169,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     doRefresh();
   }
 
-  protected abstract Object[] doParsePartitionColumnValues(String[] 
partitionColumns, String partitionPath);
+  protected abstract Object[] parsePartitionColumnValues(String[] 
partitionColumns, String partitionPath);
 
   /**
    * Returns latest completed instant as seen by this instance of the 
file-index
@@ -359,8 +359,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     }
   }
 
-  private Object[] parsePartitionColumnValues(String[] partitionColumns, 
String partitionPath) {
-    Object[] partitionColumnValues = 
doParsePartitionColumnValues(partitionColumns, partitionPath);
+  private Object[] getPartitionColumnValues(String[] partitionColumns, String 
partitionPath) {
+    Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
     if (shouldListLazily && partitionColumnValues.length != 
partitionColumns.length) {
       throw new HoodieException("Failed to parse partition column values from 
the partition-path:"
           + " likely non-encoded slashes being used in partition column's 
values. You can try to"
@@ -484,7 +484,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
   }
 
   protected PartitionPath convertToPartitionPath(String partitionPath) {
-    Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
+    Object[] partitionColumnValues = 
getPartitionColumnValues(partitionColumns, partitionPath);
     return new PartitionPath(partitionPath, partitionColumnValues);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
 
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
index 8d79acd7db1..ee1b326367e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_CLASS_NAME;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE;
@@ -131,4 +132,14 @@ public enum KeyGeneratorType {
     }
     return null;
   }
+
+  @Nullable
+  public static String getKeyGeneratorClassName(Map<String, String> config) {
+    if (config.containsKey(KEY_GENERATOR_CLASS_NAME.key())) {
+      return config.get(KEY_GENERATOR_CLASS_NAME.key());
+    } else if (config.containsKey(KEY_GENERATOR_TYPE.key())) {
+      return 
KeyGeneratorType.valueOf(config.get(KEY_GENERATOR_TYPE.key())).getClassName();
+    }
+    return null;
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index 90a053a6688..f4db321ff2a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -75,7 +75,7 @@ public class HiveHoodieTableFileIndex extends 
BaseHoodieTableFileIndex {
   }
 
   @Override
-  public Object[] doParsePartitionColumnValues(String[] partitionColumns, 
String partitionPath) {
+  public Object[] parsePartitionColumnValues(String[] partitionColumns, String 
partitionPath) {
     // NOTE: Parsing partition path into partition column values isn't 
required on Hive,
     //       since Hive does partition pruning in a different way (based on 
the input-path being
     //       fetched by the query engine)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index e0c768890dc..c4fdd678d11 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -500,6 +500,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
         relativePath,
         basePath,
         tableStructSchema,
+        tableConfig.propsMap,
         timeZoneId,
         sparkAdapter.getSparkParsePartitionUtil,
         conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 4b969d01416..e75081a936a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -53,7 +53,7 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
         val partitionPath = if (fileGroupId.getPartitionPath.isEmpty) 
emptyPartitionPath else fileGroupId.getPartitionPath
         val partitionFields = metaClient.getTableConfig.getPartitionFields
         val partitionValues: InternalRow = if (partitionFields.isPresent) {
-          new 
GenericInternalRow(doParsePartitionColumnValues(partitionFields.get(), 
partitionPath).asInstanceOf[Array[Any]])
+          new 
GenericInternalRow(parsePartitionColumnValues(partitionFields.get(), 
partitionPath).asInstanceOf[Array[Any]])
         } else {
           InternalRow.empty
         }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 649323be7cf..5d2732712f1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -376,23 +376,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   /**
    * @VisibleForTesting
    */
-  def doParsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
-    val tableConfig = metaClient.getTableConfig
-    if (null != tableConfig.getKeyGeneratorClassName
-      && 
tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
-      && null != 
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
-      && 
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
-      .matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) {
-      // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP,
-      // EPOCHMILLISECONDS, or EPOCHMICROSECONDS,
-      // we couldn't reconstruct initial partition column values from 
partition paths due to lost data after formatting in most cases.
-      // But the output for these cases is in a string format, so we can pass 
partitionPath as UTF8String
-      Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
-    } else {
-      HoodieSparkUtils.parsePartitionColumnValues(partitionColumns, 
partitionPath, getBasePath, schema,
-        configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
-        sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
-    }
+  def parsePartitionColumnValues(partitionColumns: Array[String], 
partitionPath: String): Array[Object] = {
+    HoodieSparkUtils.parsePartitionColumnValues(
+      partitionColumns,
+      partitionPath,
+      getBasePath,
+      schema,
+      metaClient.getTableConfig.propsMap,
+      configProperties.getString(DateTimeUtils.TIMEZONE_OPTION, 
SQLConf.get.sessionLocalTimeZone),
+      sparkParsePartitionUtil,
+      shouldValidatePartitionColumns(spark))
   }
 
   private def arePartitionPathsUrlEncoded: Boolean =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 87993c1dec6..55db8ac2670 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -112,9 +112,9 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase 
with ScalaAssertionS
   }
 
   /**
-   * Unit test for `doParsePartitionColumnValues` method in `HoodieFileIndex`.
+   * Unit test for `parsePartitionColumnValues` method in 
`SparkHoodieTableFileIndex`.
    *
-   * This test verifies that the `doParsePartitionColumnValues` method 
correctly returns
+   * This test verifies that the `parsePartitionColumnValues` method correctly 
returns
    * partition values when the `propsMap` in the table configuration does not 
contain the
    * expected timestamp configuration key, simulating a `null` scenario. 
Specifically,
    * this test validates the behavior for the `TIMESTAMP` key generator type, 
ensuring
@@ -136,7 +136,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase 
with ScalaAssertionS
     val partitionPath = "2023/10/28"
     val fileIndex = HoodieFileIndex(spark, metaClient, Some(schema), queryOpts)
     // Create file index and validate the result
-    val result = fileIndex.doParsePartitionColumnValues(partitionColumns, 
partitionPath)
+    val result = fileIndex.parsePartitionColumnValues(partitionColumns, 
partitionPath)
     assertEquals(1, result.length)
     assertEquals(UTF8String.fromString(partitionPath), result(0))
   }

Reply via email to