This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ba8c4c7b82c7 fix(spark): align CTAS partition fields by table 
partition order (#18899)
ba8c4c7b82c7 is described below

commit ba8c4c7b82c7c94405f712e5d1964226412f4e13
Author: fhan <[email protected]>
AuthorDate: Wed Jun 3 18:44:39 2026 +0800

    fix(spark): align CTAS partition fields by table partition order (#18899)
    
    * fix(spark): fix CTAS partition field order
    
    * fix(spark): add a short comment
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   | 34 +++++++++++++++++-
 .../spark/sql/hudi/ddl/TestCreateTable.scala       | 40 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 2b790e427647..0065c910480c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -410,7 +410,8 @@ case class ResolveImplementationsEarly(spark: SparkSession) 
extends Rule[Logical
       // Convert to CreateHoodieTableAsSelectCommand
       case ct @ CreateTable(table, mode, Some(query))
         if sparkAdapter.isHoodieTable(table) && ct.query.forall(_.resolved) =>
-        val alignedQuery = stripMetaFieldAttributes(query)
+        val alignedQuery = alignCtasQueryByPartitionOrder(
+          stripMetaFieldAttributes(query), table.partitionColumnNames)
         CreateHoodieTableAsSelectCommand(table, mode, alignedQuery)
 
       case ct: CreateTable =>
@@ -432,6 +433,37 @@ case class ResolveImplementationsEarly(spark: 
SparkSession) extends Rule[Logical
       case _ => plan
     }
   }
+
+  private def alignCtasQueryByPartitionOrder(query: LogicalPlan, 
partitionColumns: Seq[String]): LogicalPlan = {
+    if (partitionColumns.isEmpty) {
+      query
+    } else {
+      val resolver = spark.sessionState.conf.resolver
+      val (dataAttrs, partitionAttrs) = query.output.partition { attr =>
+        !partitionColumns.exists(partition => resolver(partition, attr.name))
+      }
+
+      if (partitionAttrs.size != partitionColumns.size) {
+        throw new HoodieAnalysisException(s"Partition columns 
${partitionColumns.mkString("[", ", ", "]")} " +
+          s"do not match query output ${query.output.map(_.name).mkString("[", 
", ", "]")}")
+      }
+
+      val alreadyAligned = partitionColumns.zip(partitionAttrs).forall {
+        case (partition, attr) => resolver(partition, attr.name)
+      }
+      // Avoid adding a redundant Project when partition columns are already 
in the table-defined order.
+      if (alreadyAligned) {
+        query
+      } else {
+        val orderedPartitionAttrs = partitionColumns.map { partition =>
+          partitionAttrs.find(attr => resolver(partition, 
attr.name)).getOrElse {
+            throw new HoodieAnalysisException(s"Cannot resolve partition 
column $partition in CTAS query output")
+          }
+        }
+        Project(dataAttrs ++ orderedPartitionAttrs, query)
+      }
+    }
+  }
 }
 
 /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index ead994342162..88ec4b7fb4c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -416,6 +416,46 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
           Seq(1, "a1", 10, "2021-04-01")
         )
 
+        // Create table with multi-level partition
+        val tableNameMultiLevelPartition = generateTableName
+        spark.sql(
+          s"""
+             | create table $tableNameMultiLevelPartition using hudi
+             | partitioned by (year, month, day)
+             | tblproperties(
+             |    primaryKey = 'id',
+             |    type = '$tableType'
+             | )
+             | location '${tmp.getCanonicalPath}/$tableNameMultiLevelPartition'
+             | AS
+             | select 1 as id, 'a1' as name, 10 as price, '2021' as year, '04' 
as month, '01' as day
+         """.stripMargin
+        )
+
+        checkAnswer(s"select id, name, price, year, month, day from 
$tableNameMultiLevelPartition")(
+          Seq(1, "a1", 10, "2021", "04", "01")
+        )
+
+        // Create table with multi-level partition and out-of-order partition 
columns
+        val tableNameMultiLevelPartitionDisorder = generateTableName
+        spark.sql(
+          s"""
+             | create table $tableNameMultiLevelPartitionDisorder using hudi
+             | partitioned by (year, month, day)
+             | tblproperties(
+             |    primaryKey = 'id',
+             |    type = '$tableType'
+             | )
+             | location 
'${tmp.getCanonicalPath}/$tableNameMultiLevelPartitionDisorder'
+             | AS
+             | select 1 as id, 'a1' as name, 10 as price, '04' as month, '01' 
as day, '2021' as year
+         """.stripMargin
+        )
+
+        checkAnswer(s"select id, name, price, year, month, day from 
$tableNameMultiLevelPartitionDisorder")(
+          Seq(1, "a1", 10, "2021", "04", "01")
+        )
+
         // Create Partitioned table with timestamp data type
         val tableName3 = generateTableName
         // CTAS failed with null primaryKey

Reply via email to