leesf commented on a change in pull request #4270:
URL: https://github.com/apache/hudi/pull/4270#discussion_r767162488



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala
##########
@@ -18,17 +18,25 @@
 package org.apache.spark.sql.avro
 
 import org.apache.avro.Schema
+
 import org.apache.spark.sql.types.DataType
 
 /**
  * This is to be compatible with the type returned by Spark 3.1
  * and other spark versions for AvroDeserializer
  */
-case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType)
-  extends AvroDeserializer(rootAvroType, rootCatalystType) {
+case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: 
DataType) {
+
+  private val avroDeserializer = if 
(org.apache.spark.SPARK_VERSION.startsWith("3.2")) {
+    val constructor = 
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], 
classOf[String])
+    constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
+  } else {
+    val constructor = 
classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
+    constructor.newInstance(rootAvroType, rootCatalystType)
+  }
 
   def deserializeData(data: Any): Any = {
-    super.deserialize(data) match {
+    avroDeserializer.deserialize(data) match {
       case Some(r) => r // spark 3.1 return type is Option, we fetch the data.

Review comment:
       here the description also need to be fixed?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import 
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
 case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, 
instantTimestamp: Option[Long])
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionTable = {

Review comment:
       can we link the spark jira link in the codebase to help users understand 
the background

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala
##########
@@ -22,17 +22,37 @@ import 
org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
 case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, 
instantTimestamp: Option[Long])
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionTable = {
+    copy(table = newChildren.head)
+  }
 }
 
 case class CompactionPath(path: String, operation: CompactionOperation, 
instantTimestamp: Option[Long])
-  extends Command
+  extends Command {
+  override def children: Seq[LogicalPlan] = Seq.empty
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionPath = {
+    this
+  }
+}
 
 case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
   extends Command {
   override def children: Seq[LogicalPlan] = Seq(table)
+
+  def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
CompactionShowOnTable = {
+     copy(table = newChildren.head)

Review comment:
       would you please clarify why here use copy while the other just use this?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
               .map { case (targetAttr, sourceAttr) => Assignment(targetAttr, 
sourceAttr) }
           }
         } else {
-          assignments.map(assignment => {
+          // For Spark3.2, InsertStarAction/UpdateStarAction's assignments 
will contain the meta fields.

Review comment:
       meta fields here is spark internal meta fields?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
               .map { case (targetAttr, sourceAttr) => Assignment(targetAttr, 
sourceAttr) }
           }
         } else {
-          assignments.map(assignment => {
+          // For Spark3.2, InsertStarAction/UpdateStarAction's assignments 
will contain the meta fields.

Review comment:
       meta fields here is spark internal meta fields or hudi meta fields?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
               .map { case (targetAttr, sourceAttr) => Assignment(targetAttr, 
sourceAttr) }
           }
         } else {
-          assignments.map(assignment => {
+          // For Spark3.2, InsertStarAction/UpdateStarAction's assignments 
will contain the meta fields.

Review comment:
       so here meta fields is hudi meta fields?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>

Review comment:
       > SPARK-34962
   
   ditto, link the spark jira link

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>
+          // ForSpark3.2, it's UpdateStarAction
+          UpdateAction(action.condition, Seq.empty)

Review comment:
       the UpdateAction is not aligned with the description?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>
+          // ForSpark3.2, it's UpdateStarAction
+          UpdateAction(action.condition, Seq.empty)
       }
       // Resolve the notMatchedActions
       val resolvedNotMatchedActions = notMatchedActions.map {
         case InsertAction(condition, assignments) =>
           val (resolvedCondition, resolvedAssignments) =
             resolveConditionAssignments(condition, assignments)
           InsertAction(resolvedCondition, resolvedAssignments)
+        case action: MergeAction =>
+          // ForSpark3.2, it's InsertStarAction

Review comment:
       nit: `For Spark3.2`

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##########
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: 
SparkSession) extends Rule[Logi
         case DeleteAction(condition) =>
           val resolvedCondition = 
condition.map(resolveExpressionFrom(resolvedSource)(_))
           DeleteAction(resolvedCondition)
+        case action: MergeAction =>
+          // ForSpark3.2, it's UpdateStarAction
+          UpdateAction(action.condition, Seq.empty)
       }
       // Resolve the notMatchedActions
       val resolvedNotMatchedActions = notMatchedActions.map {
         case InsertAction(condition, assignments) =>
           val (resolvedCondition, resolvedAssignments) =
             resolveConditionAssignments(condition, assignments)
           InsertAction(resolvedCondition, resolvedAssignments)
+        case action: MergeAction =>
+          // ForSpark3.2, it's InsertStarAction

Review comment:
       nit: `For Spark3.2` and ditto(the description)

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -59,7 +64,7 @@ case class AlterHoodieTableAddColumnsCommand(
           s" table columns is: 
[${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]")
       }
       // Get the new schema
-      val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd)
+      val newSqlSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ 
colsToAdd ++ hoodieCatalogTable.partitionSchema.fields)

Review comment:
       would you please clarify why need this change? must put partition schema 
in the end?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
##########
@@ -203,7 +207,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Runnab
 
     sourceExpression match {
       case attr: AttributeReference if sourceColumnName.find(resolver(_, 
attr.name)).get.equals(targetColumnName) => true
-      case Cast(attr: AttributeReference, _, _) if 
sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => 
true
+      case cast: Cast =>

Review comment:
       ditto, please put the spark jira link and some description




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to