This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b4c8f3308d [HUDI-8823] Ban update from updating primary key and 
partition key (#12587)
6b4c8f3308d is described below

commit 6b4c8f3308dd6f4859c286edf2ac7fdc9b12a165
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Tue Jan 14 15:35:27 2025 -0800

    [HUDI-8823] Ban update from updating primary key and partition key (#12587)
---
 .../hudi/command/UpdateHoodieTableCommand.scala    | 58 ++++++++++++++++-
 .../spark/sql/hudi/dml/TestUpdateTable.scala       | 72 ++++++++++++++++++++++
 2 files changed, 128 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
index 1e0479d657f..fc10c90ce6f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
@@ -22,17 +22,53 @@ import org.apache.hudi.SparkAdapterSupport
 
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
LogicalPlan, Project, UpdateTable}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
 
 case class UpdateHoodieTableCommand(ut: UpdateTable) extends 
HoodieLeafRunnableCommand
   with SparkAdapterSupport with ProvidesHoodieConfig {
 
+  private var sparkSession: SparkSession = _
+
+  private lazy val hoodieCatalogTable = 
sparkAdapter.resolveHoodieTable(ut.table) match {
+    case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+    case _ =>
+      failAnalysis(s"Failed to resolve update statement into the Hudi table. 
Got instead: ${ut.table}")
+  }
+
+  /**
+   * Validate there is no assignment clause for the given attribute in the 
given table.
+   *
+   * @param resolver    The resolver to use
+   * @param fields      The fields from the target table who should not have 
any assignment clause
+   * @param tableId     Table identifier (for error messages)
+   * @param fieldType   Type of the attribute to be validated (for error 
messages)
+   * @param assignments The assignments clause
+   *
+   * @throws AnalysisException if assignment clause for the given target table 
attribute is found
+   */
+  private def validateNoAssignmentsToTargetTableAttr(resolver: Resolver,
+                                                     fields: Seq[String],
+                                                     tableId: String,
+                                                     fieldType: String,
+                                                     assignments: 
Seq[(AttributeReference, Expression)]
+                                                     ): Unit = {
+    fields.foreach(field => if (assignments.exists {
+      case (attr, _) => resolver(attr.name, field)
+    }) {
+      throw new AnalysisException(s"Detected disallowed assignment clause in 
UPDATE statement for $fieldType " +
+        s"`$field` for table `$tableId`. Please remove the assignment clause 
to avoid the error.")
+    })
+  }
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    this.sparkSession = sparkSession
     val catalogTable = sparkAdapter.resolveHoodieTable(ut.table)
       .map(HoodieCatalogTable(sparkSession, _))
       .get
@@ -45,6 +81,24 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends 
HoodieLeafRunnableC
       case Assignment(attr: AttributeReference, value) => attr -> value
     }
 
+    // We don't support update queries changing partition column value.
+    validateNoAssignmentsToTargetTableAttr(
+      sparkSession.sessionState.conf.resolver,
+      hoodieCatalogTable.tableConfig.getPartitionFields.orElse(Array.empty),
+      tableId,
+      "partition field",
+      assignedAttributes
+    )
+
+    // We don't support update queries changing the primary key column value.
+    validateNoAssignmentsToTargetTableAttr(
+      sparkSession.sessionState.conf.resolver,
+      hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+      tableId,
+      "record key field",
+      assignedAttributes
+    )
+
     val filteredOutput = if 
(sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
       , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
       ut.table.output
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index a7df73373a4..cdca1789fc8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.hudi.HoodieSparkUtils.gteqSpark3_4
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.assertEquals
 
@@ -364,4 +366,74 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Update Table With Primary Key and Partition Key Updates error 
out") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        // create table with primary key and partition
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  pt string
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by (pt)
+       """.stripMargin)
+
+        // Insert initial data
+        spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000, 
'2021')")
+
+        // Verify initial state
+        checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2021")
+        )
+
+        // Try to update primary key (should fail)
+        val e1 = intercept[AnalysisException] {
+          spark.sql(s"update $tableName set id = 2 where id = 1")
+        }
+
+        if (gteqSpark3_4) {
+          assert(e1.getMessage.contains(s"Detected disallowed assignment 
clause in UPDATE statement for record key field `id`" +
+            s" for table `spark_catalog.default.$tableName`. Please remove the 
assignment clause to avoid the error."))
+        } else {
+          assert(e1.getMessage.contains(s"Detected disallowed assignment 
clause in UPDATE statement for record key field `id`" +
+            s" for table `default.$tableName`. Please remove the assignment 
clause to avoid the error."))
+        }
+
+        // Try to update partition column (should fail)
+        val e2 = intercept[AnalysisException] {
+          spark.sql(s"update $tableName set pt = '2022' where id = 1")
+        }
+        if (gteqSpark3_4) {
+          assert(e2.getMessage.contains(s"Detected disallowed assignment 
clause in UPDATE statement for partition field `pt`" +
+            s" for table `spark_catalog.default.$tableName`. Please remove the 
assignment clause to avoid the error."))
+        } else {
+          assert(e2.getMessage.contains(s"Detected disallowed assignment 
clause in UPDATE statement for partition field `pt`" +
+            s" for table `default.$tableName`. Please remove the assignment 
clause to avoid the error."))
+        }
+
+        // Verify data remains unchanged after failed updates
+        checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2021")
+        )
+
+        // Verify normal update still works
+        spark.sql(s"update $tableName set price = 20.0 where id = 1")
+        checkAnswer(s"select id, name, price, ts, pt from $tableName")(
+          Seq(1, "a1", 20.0, 1000, "2021")
+        )
+      }
+    })
+  }
 }

Reply via email to