Davis-Zhang-Onehouse commented on code in PR #12587:
URL: https://github.com/apache/hudi/pull/12587#discussion_r1908069153
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -19,20 +19,56 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.SparkAdapterSupport
-
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
+ private var sparkSession: SparkSession = _
+
+ private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(ut.table) match {
+ case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+ case _ =>
+ failAnalysis(s"Failed to resolve update statement into the Hudi table.
Got instead: ${ut.table}")
+ }
+
+ /**
+ * Validate there is no assignment clause for the given attribute in the
given table.
+ *
+ * @param resolver The resolver to use
+ * @param fields The fields from the target table who should not have any
assignment clause
+ * @param tableId Table identifier (for error messages)
+ * @param fieldType Type of the attribute to be validated (for error
messages)
+ * @param assignments The assignments clause of the merge into action
+ *
+ *
+ * @throws AnalysisException if assignment clause for the given target table
attribute is found
+ */
+ private def validateNoAssignmentsToTargetTableAttr(resolver: Resolver,
+ fields: Seq[String],
+ tableId: String,
+ fieldType: String,
+ assignments:
Seq[(AttributeReference, Expression)]
+ ): Unit = {
+ fields.foreach(field => if (assignments.exists {
+ case (attr, _) => resolver(attr.name, field)
+ }) {
+ throw new AnalysisException(s"Detected update query with disallowed
assignment clause for $fieldType " +
+ s"`$field` for table `$tableId`")
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]