lokeshj1703 commented on code in PR #11710:
URL: https://github.com/apache/hudi/pull/11710#discussion_r1745901464


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala:
##########
@@ -18,17 +18,139 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.util
+import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{DataType, StringType, StructField, 
StructType}
 
 import java.util.TimeZone
 
-trait SparkParsePartitionUtil extends Serializable {
+trait SparkParsePartitionUtil extends Serializable with Logging {
 
   def parsePartition(path: Path,
                      typeInference: Boolean,
                      basePaths: Set[Path],
                      userSpecifiedDataTypes: Map[String, DataType],
                      timeZone: TimeZone,
                      validatePartitionValues: Boolean = false): InternalRow
+
+  /**
+   * This function generates the partition schema for hoodie file index. This 
method is used by both HoodieFileIndex and
+   * HoodieReaderFileIndex. For HoodieReaderFileIndex it upgrades the schema 
of partition columns with timestamp partition
+   * type to STRING whereas for HoodieFileIndex it uses the base schema type 
of such partition columns. This makes sure
+   * that with output partition format as DD/MM/YYYY, there are no 
incompatible schema errors while reading the table.
+   */
+  def getPartitionSchema(tableConfig: HoodieTableConfig, schema: StructType, 
useStringTypeForCustomTimestampPartition: Boolean): StructType = {
+    val nameFieldMap: Map[String, StructField] = generateFieldMap(schema)
+    val partitionColumns = tableConfig.getPartitionFields
+
+    def validateAndGetPartitionFieldsStruct(partitionFields: 
Array[StructField]) = {
+      if (partitionFields.length != partitionColumns.get().length) {
+        val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
+        if (isBootstrapTable) {
+          // For bootstrapped tables its possible the schema does not contain 
partition field when source table
+          // is hive style partitioned. In this case we would like to treat 
the table as non-partitioned
+          // as opposed to failing
+          new StructType()
+        } else {
+          throw new IllegalArgumentException(s"Cannot find columns: " +
+            s"'${partitionColumns.get().filter(col => 
!nameFieldMap.contains(col)).mkString(",")}' " +
+            s"in the schema[${nameFieldMap.keys.mkString(",")}]")
+        }
+      } else {
+        new StructType(partitionFields)
+      }
+    }
+
+    def getPartitionStructFields(keyGeneratorPartitionFieldsOpt: 
util.Option[String], keyGeneratorClassName: String) = {
+      val partitionFields: Array[StructField] = if 
(useStringTypeForCustomTimestampPartition && 
keyGeneratorPartitionFieldsOpt.isPresent
+        && 
(classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
+        || 
classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)))
 {
+        val keyGeneratorPartitionFields = 
keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR)
+        keyGeneratorPartitionFields.map(field => 
CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field))
+          .map(pair => {
+            val partitionField = pair.getLeft
+            val partitionKeyType = pair.getRight
+            partitionKeyType match {
+              case PartitionKeyType.SIMPLE => if 
(nameFieldMap.contains(partitionField)) {
+                nameFieldMap.apply(partitionField)
+              } else {
+                null
+              }
+              case PartitionKeyType.TIMESTAMP => StructField(partitionField, 
StringType)

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to