This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b4c8f3308d [HUDI-8823] Ban update from updating primary key and
partition key (#12587)
6b4c8f3308d is described below
commit 6b4c8f3308dd6f4859c286edf2ac7fdc9b12a165
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Jan 14 15:35:27 2025 -0800
[HUDI-8823] Ban update from updating primary key and partition key (#12587)
---
.../hudi/command/UpdateHoodieTableCommand.scala | 58 ++++++++++++++++-
.../spark/sql/hudi/dml/TestUpdateTable.scala | 72 ++++++++++++++++++++++
2 files changed, 128 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 1e0479d657f..fc10c90ce6f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -22,17 +22,53 @@ import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
+ private var sparkSession: SparkSession = _
+
+ private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(ut.table) match {
+ case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+ case _ =>
+ failAnalysis(s"Failed to resolve update statement into the Hudi table.
Got instead: ${ut.table}")
+ }
+
+ /**
+ * Validate there is no assignment clause for the given attribute in the
given table.
+ *
+ * @param resolver The resolver to use
+ * @param fields The fields from the target table who should not have
any assignment clause
+ * @param tableId Table identifier (for error messages)
+ * @param fieldType Type of the attribute to be validated (for error
messages)
+ * @param assignments The assignments clause
+ *
+ * @throws AnalysisException if assignment clause for the given target table
attribute is found
+ */
+ private def validateNoAssignmentsToTargetTableAttr(resolver: Resolver,
+ fields: Seq[String],
+ tableId: String,
+ fieldType: String,
+ assignments:
Seq[(AttributeReference, Expression)]
+ ): Unit = {
+ fields.foreach(field => if (assignments.exists {
+ case (attr, _) => resolver(attr.name, field)
+ }) {
+ throw new AnalysisException(s"Detected disallowed assignment clause in
UPDATE statement for $fieldType " +
+ s"`$field` for table `$tableId`. Please remove the assignment clause
to avoid the error.")
+ })
+ }
+
override def run(sparkSession: SparkSession): Seq[Row] = {
+ this.sparkSession = sparkSession
val catalogTable = sparkAdapter.resolveHoodieTable(ut.table)
.map(HoodieCatalogTable(sparkSession, _))
.get
@@ -45,6 +81,24 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableC
case Assignment(attr: AttributeReference, value) => attr -> value
}
+ // We don't support update queries changing partition column value.
+ validateNoAssignmentsToTargetTableAttr(
+ sparkSession.sessionState.conf.resolver,
+ hoodieCatalogTable.tableConfig.getPartitionFields.orElse(Array.empty),
+ tableId,
+ "partition field",
+ assignedAttributes
+ )
+
+ // We don't support update queries changing the primary key column value.
+ validateNoAssignmentsToTargetTableAttr(
+ sparkSession.sessionState.conf.resolver,
+ hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+ tableId,
+ "record key field",
+ assignedAttributes
+ )
+
val filteredOutput = if
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
, SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
ut.table.output
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index a7df73373a4..cdca1789fc8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.hudi.HoodieSparkUtils.gteqSpark3_4
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.assertEquals
@@ -364,4 +366,74 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test Update Table With Primary Key and Partition Key Updates error
out") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ // create table with primary key and partition
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | pt string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by (pt)
+ """.stripMargin)
+
+ // Insert initial data
+ spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000,
'2021')")
+
+ // Verify initial state
+ checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021")
+ )
+
+ // Try to update primary key (should fail)
+ val e1 = intercept[AnalysisException] {
+ spark.sql(s"update $tableName set id = 2 where id = 1")
+ }
+
+ if (gteqSpark3_4) {
+ assert(e1.getMessage.contains(s"Detected disallowed assignment
clause in UPDATE statement for record key field `id`" +
+ s" for table `spark_catalog.default.$tableName`. Please remove the
assignment clause to avoid the error."))
+ } else {
+ assert(e1.getMessage.contains(s"Detected disallowed assignment
clause in UPDATE statement for record key field `id`" +
+ s" for table `default.$tableName`. Please remove the assignment
clause to avoid the error."))
+ }
+
+ // Try to update partition column (should fail)
+ val e2 = intercept[AnalysisException] {
+ spark.sql(s"update $tableName set pt = '2022' where id = 1")
+ }
+ if (gteqSpark3_4) {
+ assert(e2.getMessage.contains(s"Detected disallowed assignment
clause in UPDATE statement for partition field `pt`" +
+ s" for table `spark_catalog.default.$tableName`. Please remove the
assignment clause to avoid the error."))
+ } else {
+ assert(e2.getMessage.contains(s"Detected disallowed assignment
clause in UPDATE statement for partition field `pt`" +
+ s" for table `default.$tableName`. Please remove the assignment
clause to avoid the error."))
+ }
+
+ // Verify data remains unchanged after failed updates
+ checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021")
+ )
+
+ // Verify normal update still works
+ spark.sql(s"update $tableName set price = 20.0 where id = 1")
+ checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+ Seq(1, "a1", 20.0, 1000, "2021")
+ )
+ }
+ })
+ }
}