This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b216d794c0d [HUDI-8455] Logic for partition column values parse is
localized in `HoodieSparkUtils` (#12192)
b216d794c0d is described below
commit b216d794c0d902c10106704ddacff775c83f64bb
Author: Geser Dugarov <[email protected]>
AuthorDate: Mon Nov 4 09:42:45 2024 +0700
[HUDI-8455] Logic for partition column values parse is localized in
`HoodieSparkUtils` (#12192)
---
.../hudi/client/utils/SparkPartitionUtils.java | 2 +-
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 28 ++++++++++++++++++++++
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 8 +++----
.../hudi/keygen/constant/KeyGeneratorType.java | 11 +++++++++
.../hudi/hadoop/HiveHoodieTableFileIndex.java | 2 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 1 +
.../scala/org/apache/hudi/HoodieCDCFileIndex.scala | 2 +-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 27 ++++++++-------------
.../org/apache/hudi/TestHoodieFileIndex.scala | 6 ++---
9 files changed, 60 insertions(+), 27 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index e8db1b3515d..f2309db19b2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -40,7 +40,7 @@ public class SparkPartitionUtils {
return new Object[0];
}
SparkParsePartitionUtil sparkParsePartitionUtil =
SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil();
- return HoodieSparkUtils.parsePartitionColumnValues(
+ return HoodieSparkUtils.doParsePartitionColumnValues(
partitionFields.get(),
partitionPath,
new StoragePath(basePath),
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 0afd19745ba..88325617df1 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -28,6 +28,9 @@ import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -227,6 +230,31 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
}
def parsePartitionColumnValues(partitionColumns: Array[String],
+ partitionPath: String,
+ tableBasePath: StoragePath,
+ tableSchema: StructType,
+ tableConfig: java.util.Map[String, String],
+ timeZoneId: String,
+ sparkParsePartitionUtil:
SparkParsePartitionUtil,
+ shouldValidatePartitionColumns: Boolean):
Array[Object] = {
+ val keyGeneratorClass =
KeyGeneratorType.getKeyGeneratorClassName(tableConfig)
+ val timestampKeyGeneratorType =
tableConfig.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
+
+ if (null != keyGeneratorClass
+ && null != timestampKeyGeneratorType
+ && keyGeneratorClass.equals(KeyGeneratorType.TIMESTAMP.getClassName)
+ &&
!timestampKeyGeneratorType.matches(TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.toString))
{
+ // For TIMESTAMP key generator when TYPE is not DATE_STRING (like
SCALAR, UNIX_TIMESTAMP, EPOCHMILLISECONDS, etc.),
+ // we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting.
+ // But the output for these cases is in a string format, so we can pass
partitionPath as UTF8String
+ Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
+ } else {
+ doParsePartitionColumnValues(partitionColumns, partitionPath,
tableBasePath, tableSchema, timeZoneId,
+ sparkParsePartitionUtil, shouldValidatePartitionColumns)
+ }
+ }
+
+ def doParsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String,
basePath: StoragePath,
schema: StructType,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 7a6ed714ac9..bbd3b50b50f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -169,7 +169,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
doRefresh();
}
- protected abstract Object[] doParsePartitionColumnValues(String[]
partitionColumns, String partitionPath);
+ protected abstract Object[] parsePartitionColumnValues(String[]
partitionColumns, String partitionPath);
/**
* Returns latest completed instant as seen by this instance of the
file-index
@@ -359,8 +359,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
}
}
- private Object[] parsePartitionColumnValues(String[] partitionColumns,
String partitionPath) {
- Object[] partitionColumnValues =
doParsePartitionColumnValues(partitionColumns, partitionPath);
+ private Object[] getPartitionColumnValues(String[] partitionColumns, String
partitionPath) {
+ Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
if (shouldListLazily && partitionColumnValues.length !=
partitionColumns.length) {
throw new HoodieException("Failed to parse partition column values from
the partition-path:"
+ " likely non-encoded slashes being used in partition column's
values. You can try to"
@@ -484,7 +484,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
}
protected PartitionPath convertToPartitionPath(String partitionPath) {
- Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
+ Object[] partitionColumnValues =
getPartitionColumnValues(partitionColumns, partitionPath);
return new PartitionPath(partitionPath, partitionColumnValues);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
index 8d79acd7db1..ee1b326367e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorType.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import static
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_CLASS_NAME;
import static
org.apache.hudi.common.table.HoodieTableConfig.KEY_GENERATOR_TYPE;
@@ -131,4 +132,14 @@ public enum KeyGeneratorType {
}
return null;
}
+
+ @Nullable
+ public static String getKeyGeneratorClassName(Map<String, String> config) {
+ if (config.containsKey(KEY_GENERATOR_CLASS_NAME.key())) {
+ return config.get(KEY_GENERATOR_CLASS_NAME.key());
+ } else if (config.containsKey(KEY_GENERATOR_TYPE.key())) {
+ return
KeyGeneratorType.valueOf(config.get(KEY_GENERATOR_TYPE.key())).getClassName();
+ }
+ return null;
+ }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index 90a053a6688..f4db321ff2a 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -75,7 +75,7 @@ public class HiveHoodieTableFileIndex extends
BaseHoodieTableFileIndex {
}
@Override
- public Object[] doParsePartitionColumnValues(String[] partitionColumns,
String partitionPath) {
+ public Object[] parsePartitionColumnValues(String[] partitionColumns, String
partitionPath) {
// NOTE: Parsing partition path into partition column values isn't
required on Hive,
// since Hive does partition pruning in a different way (based on
the input-path being
// fetched by the query engine)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index e0c768890dc..c4fdd678d11 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -500,6 +500,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
relativePath,
basePath,
tableStructSchema,
+ tableConfig.propsMap,
timeZoneId,
sparkAdapter.getSparkParsePartitionUtil,
conf.getBoolean("spark.sql.sources.validatePartitionColumns", true))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 4b969d01416..e75081a936a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -53,7 +53,7 @@ class HoodieCDCFileIndex (override val spark: SparkSession,
val partitionPath = if (fileGroupId.getPartitionPath.isEmpty)
emptyPartitionPath else fileGroupId.getPartitionPath
val partitionFields = metaClient.getTableConfig.getPartitionFields
val partitionValues: InternalRow = if (partitionFields.isPresent) {
- new
GenericInternalRow(doParsePartitionColumnValues(partitionFields.get(),
partitionPath).asInstanceOf[Array[Any]])
+ new
GenericInternalRow(parsePartitionColumnValues(partitionFields.get(),
partitionPath).asInstanceOf[Array[Any]])
} else {
InternalRow.empty
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 649323be7cf..5d2732712f1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -376,23 +376,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* @VisibleForTesting
*/
- def doParsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
- val tableConfig = metaClient.getTableConfig
- if (null != tableConfig.getKeyGeneratorClassName
- &&
tableConfig.getKeyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
- && null !=
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
- &&
tableConfig.propsMap.get(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key())
- .matches("SCALAR|UNIX_TIMESTAMP|EPOCHMILLISECONDS|EPOCHMICROSECONDS")) {
- // For TIMESTAMP key generator when TYPE is SCALAR, UNIX_TIMESTAMP,
- // EPOCHMILLISECONDS, or EPOCHMICROSECONDS,
- // we couldn't reconstruct initial partition column values from
partition paths due to lost data after formatting in most cases.
- // But the output for these cases is in a string format, so we can pass
partitionPath as UTF8String
- Array.fill(partitionColumns.length)(UTF8String.fromString(partitionPath))
- } else {
- HoodieSparkUtils.parsePartitionColumnValues(partitionColumns,
partitionPath, getBasePath, schema,
- configProperties.getString(DateTimeUtils.TIMEZONE_OPTION,
SQLConf.get.sessionLocalTimeZone),
- sparkParsePartitionUtil, shouldValidatePartitionColumns(spark))
- }
+ def parsePartitionColumnValues(partitionColumns: Array[String],
partitionPath: String): Array[Object] = {
+ HoodieSparkUtils.parsePartitionColumnValues(
+ partitionColumns,
+ partitionPath,
+ getBasePath,
+ schema,
+ metaClient.getTableConfig.propsMap,
+ configProperties.getString(DateTimeUtils.TIMEZONE_OPTION,
SQLConf.get.sessionLocalTimeZone),
+ sparkParsePartitionUtil,
+ shouldValidatePartitionColumns(spark))
}
private def arePartitionPathsUrlEncoded: Boolean =
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 87993c1dec6..55db8ac2670 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -112,9 +112,9 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase
with ScalaAssertionS
}
/**
- * Unit test for `doParsePartitionColumnValues` method in `HoodieFileIndex`.
+ * Unit test for `parsePartitionColumnValues` method in
`SparkHoodieTableFileIndex`.
*
- * This test verifies that the `doParsePartitionColumnValues` method
correctly returns
+ * This test verifies that the `parsePartitionColumnValues` method correctly
returns
* partition values when the `propsMap` in the table configuration does not
contain the
* expected timestamp configuration key, simulating a `null` scenario.
Specifically,
* this test validates the behavior for the `TIMESTAMP` key generator type,
ensuring
@@ -136,7 +136,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase
with ScalaAssertionS
val partitionPath = "2023/10/28"
val fileIndex = HoodieFileIndex(spark, metaClient, Some(schema), queryOpts)
// Create file index and validate the result
- val result = fileIndex.doParsePartitionColumnValues(partitionColumns,
partitionPath)
+ val result = fileIndex.parsePartitionColumnValues(partitionColumns,
partitionPath)
assertEquals(1, result.length)
assertEquals(UTF8String.fromString(partitionPath), result(0))
}