This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9c8ae4e879 [HUDI-7964] Error out if partition schema and table schema 
do not match in create table command (#11650)
c9c8ae4e879 is described below

commit c9c8ae4e879934e7f7f00a586575841625f43432
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jul 24 19:43:46 2024 +0530

    [HUDI-7964] Error out if partition schema and table schema do not match in 
create table command (#11650)
    
    * [HUDI-7964] Error out if partition schema and table schema do not match 
in create table command
    
    * Handle df.saveAsTable
    
    * add fields in the log message
---
 .../scala/org/apache/hudi/HoodieSchemaUtils.scala  | 22 ++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   | 18 +++++++-
 .../spark/sql/hudi/ddl/TestCreateTable.scala       | 50 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index 5ab32338706..17459ffb347 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -34,6 +34,7 @@ import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
 
 import org.apache.avro.Schema
+import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
@@ -244,4 +245,25 @@ object HoodieSchemaUtils {
       (newSchema, isSchemaCompatible(tableSchema, newSchema))
     }
   }
+
+  /**
+   * Check if the partition schema fields order matches the table schema 
fields order.
+   *
+   * @param tableSchema      The table schema
+   * @param partitionFields  The partition fields
+   */
+  def checkPartitionSchemaOrder(tableSchema: StructType, partitionFields: 
Seq[String]): Unit = {
+    val tableSchemaFields = tableSchema.fields.map(_.name)
+    // It is not allowed to specify partition columns when the table schema is 
not defined.
+    // 
https://spark.apache.org/docs/latest/sql-error-conditions.html#specify_partition_is_not_allowed
+    if (tableSchemaFields.isEmpty && partitionFields.nonEmpty) {
+      throw new IllegalArgumentException("It is not allowed to specify 
partition columns when the table schema is not defined.")
+    }
+    // Filter the table schema fields to get the partition field names in order
+    val tableSchemaPartitionFields = 
tableSchemaFields.filter(partitionFields.contains).toSeq
+    if (tableSchemaPartitionFields != partitionFields) {
+      throw new IllegalArgumentException(s"Partition schema fields order does 
not match the table schema fields order," +
+        s" tableSchemaFields: $tableSchemaPartitionFields, partitionFields: 
$partitionFields.")
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index a0017883c48..c2327ac2e3b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
 
 import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils}
 import org.apache.hudi.common.util.ReflectionUtils.loadClass
-import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
+import org.apache.hudi.{HoodieSchemaUtils, HoodieSparkUtils, 
SparkAdapterSupport}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
@@ -428,6 +428,22 @@ case class ResolveImplementationsEarly() extends 
Rule[LogicalPlan] {
         if sparkAdapter.isHoodieTable(table) && ct.query.forall(_.resolved) =>
         CreateHoodieTableAsSelectCommand(table, mode, query)
 
+      case ct: CreateTable =>
+        try {
+          // NOTE: In case of CreateTable with schema and multiple partition 
fields,
+          // we have to make sure that partition fields are ordered in the 
same way as they are in the schema.
+          val tableSchema = 
ct.query.map(_.schema).getOrElse(ct.tableDesc.schema)
+          HoodieSchemaUtils.checkPartitionSchemaOrder(tableSchema, 
ct.tableDesc.partitionColumnNames)
+        } catch {
+          case e: IllegalArgumentException =>
+            throw e
+          case _: Exception =>
+            // NOTE: This case is when query is unresolved but table is a 
managed table and already exists.
+            // In this case, create table will fail post-analysis (see 
[[HoodieCatalogTable.parseSchemaAndConfigs]]).
+            logWarning("An unexpected exception occurred while checking 
partition schema order. Proceeding with the plan.")
+        }
+        plan
+
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index 169c99d671c..c5c3d98856c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -1463,4 +1463,54 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Create Hoodie Table With Multiple Partitions") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}"
+      // throws error if order in partition by different from that in create 
table
+      assertThrows[IllegalArgumentException] {
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |    ts BIGINT,
+             |    id STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    fare DOUBLE,
+             |    city STRING,
+             |    state STRING
+             |) using hudi
+             | options(
+             |    primaryKey ='id'
+             |)
+             |PARTITIONED BY (state, city)
+             |location '$tablePath';
+       """.stripMargin)
+      }
+      // otherwise successful
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |    ts BIGINT,
+           |    id STRING,
+           |    rider STRING,
+           |    driver STRING,
+           |    fare DOUBLE,
+           |    city STRING,
+           |    state STRING
+           |) using hudi
+           | options(
+           |    primaryKey ='id'
+           |)
+           |PARTITIONED BY (city, state)
+           |location '$tablePath';
+       """.stripMargin)
+      // insert and validate
+      spark.sql(s"insert into $tableName 
values(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas')")
+      checkAnswer(s"select ts, id, rider, driver, fare, city, state from 
$tableName")(
+        Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin", 
"texas")
+      )
+    }
+  }
 }

Reply via email to