yihua commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1926241983


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
   def customResolutionRules: Seq[RuleBuilder] = {
     val rules: ListBuffer[RuleBuilder] = ListBuffer()
 
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      rules += (_ => instantiateKlass(
+        
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))

Review Comment:
   @KnightChess: If we have all the projection and ordering of columns handled 
in `HoodieSpark35ResolveColumnsForInsertInto` resolution rule, we can deprecate 
the projection logic in `ResolveImplementationsEarly` for 
`InsertIntoStatement`? 
   ```
   // Create a project if this is an INSERT INTO query with specified cols.
               val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
                 ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing 
catalog table")
                 
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
               } else {
                 None
               }
   ```
   in
   ```
   case class ResolveImplementationsEarly() extends Rule[LogicalPlan] {
   
     override def apply(plan: LogicalPlan): LogicalPlan = {
       plan match {
         // Convert to InsertIntoHoodieTableCommand
         case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), 
userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
           relation match {
             // NOTE: In Spark >= 3.2, Hudi relations will be resolved as 
[[DataSourceV2Relation]]s by default;
             //       However, currently, fallback will be applied downgrading 
them to V1 relations, hence
             //       we need to check whether we could proceed here, or has to 
wait until fallback rule kicks in
             case lr: LogicalRelation =>
               // Create a project if this is an INSERT INTO query with 
specified cols.
               val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
                 ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing 
catalog table")
                 
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
               } else {
                 None
               }
               new InsertIntoHoodieTableCommand(lr, 
projectByUserSpecified.getOrElse(query), partition, overwrite)
             case _ => iis
           }
   ```



##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class 
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
     LogicalRelation(relation, output, catalogTable, isStreaming = false)
   }
 }
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to 
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for 
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after 
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified 
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns 
specified by user
+ * to work, this custom resolution rule 
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for 
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends 
ResolveInsertionBase {
+  // NOTE: This is copied from [[PreprocessTableInsertion]] with additional 
handling of Hudi relations
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case i@InsertIntoStatement(table, _, _, query, _, _, _)
+        if table.resolved && query.resolved
+          && i.userSpecifiedCols.nonEmpty && 
i.table.isInstanceOf[LogicalRelation]
+          && 
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
 =>
+        table match {

Review Comment:
   Originally, I directly called `PreprocessTableInsertion.apply(plan)` to 
resolve the columns and add projection for columns not present in the 
user-specified columns.  Then I noticed that for empty table (`EmptyRelation` 
in old read path) or MOR table with log files (`MergeOnReadSnapshotRelation` in 
old read path), the plan may not get preprocessed as `PreprocessTableInsertion` 
does not recognize Hudi relations.  So I have to copy the logic and adapt them 
here so INSERT INTO a Hudi table with a subset of columns specified works.  But 
I'm wondering if I should extract the functionality of projection so it's 
easier to maintain (that'll take more time for me to figure out).  @KnightChess 
wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to