This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9c8ae4e879 [HUDI-7964] Error out if partition schema and table schema
do not match in create table command (#11650)
c9c8ae4e879 is described below
commit c9c8ae4e879934e7f7f00a586575841625f43432
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jul 24 19:43:46 2024 +0530
[HUDI-7964] Error out if partition schema and table schema do not match in
create table command (#11650)
* [HUDI-7964] Error out if partition schema and table schema do not match
in create table command
* Handle df.saveAsTable
* add fields in the log message
---
.../scala/org/apache/hudi/HoodieSchemaUtils.scala | 22 ++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 18 +++++++-
.../spark/sql/hudi/ddl/TestCreateTable.scala | 50 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
index 5ab32338706..17459ffb347 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala
@@ -34,6 +34,7 @@ import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils
import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements
import org.apache.avro.Schema
+import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
@@ -244,4 +245,25 @@ object HoodieSchemaUtils {
(newSchema, isSchemaCompatible(tableSchema, newSchema))
}
}
+
+ /**
+ * Check if the partition schema fields order matches the table schema
fields order.
+ *
+ * @param tableSchema The table schema
+ * @param partitionFields The partition fields
+ */
+ def checkPartitionSchemaOrder(tableSchema: StructType, partitionFields:
Seq[String]): Unit = {
+ val tableSchemaFields = tableSchema.fields.map(_.name)
+ // It is not allowed to specify partition columns when the table schema is
not defined.
+ //
https://spark.apache.org/docs/latest/sql-error-conditions.html#specify_partition_is_not_allowed
+ if (tableSchemaFields.isEmpty && partitionFields.nonEmpty) {
+ throw new IllegalArgumentException("It is not allowed to specify
partition columns when the table schema is not defined.")
+ }
+ // Filter the table schema fields to get the partition field names in order
+ val tableSchemaPartitionFields =
tableSchemaFields.filter(partitionFields.contains).toSeq
+ if (tableSchemaPartitionFields != partitionFields) {
+ throw new IllegalArgumentException(s"Partition schema fields order does
not match the table schema fields order," +
+ s" tableSchemaFields: $tableSchemaPartitionFields, partitionFields:
$partitionFields.")
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index a0017883c48..c2327ac2e3b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils}
import org.apache.hudi.common.util.ReflectionUtils.loadClass
-import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
+import org.apache.hudi.{HoodieSchemaUtils, HoodieSparkUtils,
SparkAdapterSupport}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
@@ -428,6 +428,22 @@ case class ResolveImplementationsEarly() extends
Rule[LogicalPlan] {
if sparkAdapter.isHoodieTable(table) && ct.query.forall(_.resolved) =>
CreateHoodieTableAsSelectCommand(table, mode, query)
+ case ct: CreateTable =>
+ try {
+ // NOTE: In case of CreateTable with schema and multiple partition
fields,
+ // we have to make sure that partition fields are ordered in the
same way as they are in the schema.
+ val tableSchema =
ct.query.map(_.schema).getOrElse(ct.tableDesc.schema)
+ HoodieSchemaUtils.checkPartitionSchemaOrder(tableSchema,
ct.tableDesc.partitionColumnNames)
+ } catch {
+ case e: IllegalArgumentException =>
+ throw e
+ case _: Exception =>
+ // NOTE: This case is when query is unresolved but table is a
managed table and already exists.
+ // In this case, create table will fail post-analysis (see
[[HoodieCatalogTable.parseSchemaAndConfigs]]).
+ logWarning("An unexpected exception occurred while checking
partition schema order. Proceeding with the plan.")
+ }
+ plan
+
case _ => plan
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index 169c99d671c..c5c3d98856c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -1463,4 +1463,54 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Create Hoodie Table With Multiple Partitions") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}"
+ // throws error if order in partition by different from that in create
table
+ assertThrows[IllegalArgumentException] {
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) using hudi
+ | options(
+ | primaryKey ='id'
+ |)
+ |PARTITIONED BY (state, city)
+ |location '$tablePath';
+ """.stripMargin)
+ }
+ // otherwise successful
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) using hudi
+ | options(
+ | primaryKey ='id'
+ |)
+ |PARTITIONED BY (city, state)
+ |location '$tablePath';
+ """.stripMargin)
+ // insert and validate
+ spark.sql(s"insert into $tableName
values(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas')")
+ checkAnswer(s"select ts, id, rider, driver, fare, city, state from
$tableName")(
+ Seq(1695332066, "trip3", "rider-E", "driver-O", 93.50, "austin",
"texas")
+ )
+ }
+ }
}