lokeshj1703 commented on code in PR #11710:
URL: https://github.com/apache/hudi/pull/11710#discussion_r1745901464
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala:
##########
@@ -18,17 +18,139 @@
package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator,
CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
import java.util.TimeZone
-trait SparkParsePartitionUtil extends Serializable {
+trait SparkParsePartitionUtil extends Serializable with Logging {
def parsePartition(path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone,
validatePartitionValues: Boolean = false): InternalRow
+
+ /**
+ * This function generates the partition schema for hoodie file index. This
method is used by both HoodieFileIndex and
+ * HoodieReaderFileIndex. For HoodieReaderFileIndex it upgrades the schema
of partition columns with timestamp partition
+ * type to STRING whereas for HoodieFileIndex it uses the base schema type
of such partition columns. This makes sure
+ * that with output partition format as DD/MM/YYYY, there are no
incompatible schema errors while reading the table.
+ */
+ def getPartitionSchema(tableConfig: HoodieTableConfig, schema: StructType,
useStringTypeForCustomTimestampPartition: Boolean): StructType = {
+ val nameFieldMap: Map[String, StructField] = generateFieldMap(schema)
+ val partitionColumns = tableConfig.getPartitionFields
+
+ def validateAndGetPartitionFieldsStruct(partitionFields:
Array[StructField]) = {
+ if (partitionFields.length != partitionColumns.get().length) {
+ val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
+ if (isBootstrapTable) {
+ // For bootstrapped tables its possible the schema does not contain
partition field when source table
+ // is hive style partitioned. In this case we would like to treat
the table as non-partitioned
+ // as opposed to failing
+ new StructType()
+ } else {
+ throw new IllegalArgumentException(s"Cannot find columns: " +
+ s"'${partitionColumns.get().filter(col =>
!nameFieldMap.contains(col)).mkString(",")}' " +
+ s"in the schema[${nameFieldMap.keys.mkString(",")}]")
+ }
+ } else {
+ new StructType(partitionFields)
+ }
+ }
+
+ def getPartitionStructFields(keyGeneratorPartitionFieldsOpt:
util.Option[String], keyGeneratorClassName: String) = {
+ val partitionFields: Array[StructField] = if
(useStringTypeForCustomTimestampPartition &&
keyGeneratorPartitionFieldsOpt.isPresent
+ &&
(classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+ ||
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)))
{
+ val keyGeneratorPartitionFields =
keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR)
+ keyGeneratorPartitionFields.map(field =>
CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field))
+ .map(pair => {
+ val partitionField = pair.getLeft
+ val partitionKeyType = pair.getRight
+ partitionKeyType match {
+ case PartitionKeyType.SIMPLE => if
(nameFieldMap.contains(partitionField)) {
+ nameFieldMap.apply(partitionField)
+ } else {
+ null
+ }
+ case PartitionKeyType.TIMESTAMP => StructField(partitionField,
StringType)
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]