This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ef83379d38a [HUDI-7949] insert into hudi table with columns specified 
(#11568)
ef83379d38a is described below

commit ef83379d38a08fc87fe567cbbf321fecb47fc80b
Author: KnightChess <[email protected]>
AuthorDate: Mon Jul 8 08:31:18 2024 +0800

    [HUDI-7949] insert into hudi table with columns specified (#11568)
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  10 +-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  20 ++-
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 159 +++++++++++++++++++++
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |   6 +-
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |  12 +-
 .../spark/sql/HoodieSpark30CatalystPlanUtils.scala |  11 +-
 .../spark/sql/HoodieSpark31CatalystPlanUtils.scala |  12 +-
 .../spark/sql/HoodieSpark32CatalystPlanUtils.scala |  18 +++
 .../apache/spark/sql/ResolveInsertionBase.scala    |  95 ++++++++++++
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |  18 +++
 .../spark/sql/HoodieSpark34CatalystPlanUtils.scala |  32 +++++
 .../spark/sql/HoodieSpark35CatalystPlanUtils.scala |  18 +++
 12 files changed, 389 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 40e62ddd0ef..42dcc02cca4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -22,6 +22,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
 
 trait HoodieCatalystPlansUtils {
@@ -111,8 +112,10 @@ trait HoodieCatalystPlansUtils {
   /**
    * Decomposes [[InsertIntoStatement]] into its arguments allowing to 
accommodate for API
    * changes in Spark 3.3
+   * @return a option tuple with (table logical plan, userSpecifiedCols, 
partitionSpec, query, overwrite, ifPartitionNotExists)
+   *         userSpecifiedCols: only than the version of Spark32 will return, 
other is empty
    */
-  def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, 
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
+  def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, 
Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
 
   /**
    * Decomposes [[CreateTableLikeCommand]] into its arguments allowing to 
accommodate for API
@@ -147,4 +150,9 @@ trait HoodieCatalystPlansUtils {
    * true if both plans produce the same attributes in the same order
    */
   def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean
+
+  /**
+   * Add a project to use the table column names for INSERT INTO BY NAME with 
specified cols
+   */
+  def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): 
Option[LogicalPlan]
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index aef98b4e91d..e6c7b8c1ab0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.sql.hudi.analysis
 
-import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils}
 import org.apache.hudi.common.util.ReflectionUtils.loadClass
 import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
-
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
@@ -37,7 +36,6 @@ import 
org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 
 import java.util
-
 import scala.collection.mutable.ListBuffer
 
 object HoodieAnalysis extends SparkAdapterSupport {
@@ -252,7 +250,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
 
           // NOTE: In case of [[InsertIntoStatement]] Hudi tables could be on 
both sides -- receiving and providing
           //       the data, as such we have to make sure that we handle both 
of these cases
-          case iis @ MatchInsertIntoStatement(targetTable, _, query, _, _) =>
+          case iis @ MatchInsertIntoStatement(targetTable, _, _, query, _, _) 
=>
             val updatedTargetTable = targetTable match {
               // In the receiving side of the IIS, we can't project meta-field 
attributes out,
               // and instead have to explicitly remove them
@@ -355,7 +353,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
   }
 
   private[sql] object MatchInsertIntoStatement {
-    def unapply(plan: LogicalPlan): Option[(LogicalPlan, Map[String, 
Option[String]], LogicalPlan, Boolean, Boolean)] =
+    def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], 
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] =
       sparkAdapter.getCatalystPlanUtils.unapplyInsertIntoStatement(plan)
   }
 
@@ -408,12 +406,20 @@ case class ResolveImplementationsEarly() extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan match {
       // Convert to InsertIntoHoodieTableCommand
-      case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), 
partition, query, overwrite, _) if query.resolved =>
+      case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), 
userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
         relation match {
           // NOTE: In Spark >= 3.2, Hudi relations will be resolved as 
[[DataSourceV2Relation]]s by default;
           //       However, currently, fallback will be applied downgrading 
them to V1 relations, hence
           //       we need to check whether we could proceed here, or has to 
wait until fallback rule kicks in
-          case lr: LogicalRelation => new InsertIntoHoodieTableCommand(lr, 
query, partition, overwrite)
+          case lr: LogicalRelation =>
+            // Create a project if this is an INSERT INTO query with specified 
cols.
+            val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
+              ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing 
catalog table")
+              
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
+            } else {
+              None
+            }
+            new InsertIntoHoodieTableCommand(lr, 
projectByUserSpecified.getOrElse(query), partition, overwrite)
           case _ => iis
         }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 0b1d4ca8999..f53db8e5819 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2826,4 +2826,163 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
     spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
   }
+
+  test("Test insert into with special cols") {
+    withTempDir { tmp =>
+      if (HoodieSparkUtils.gteqSpark3_2) {
+        val targetTableA = generateTableName
+        val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+
+        spark.sql(
+          s"""
+             |create table if not exists $targetTableA (
+             | id bigint,
+             | name string,
+             | price double
+             |) using hudi
+             |tblproperties (
+             | primaryKey = 'id',
+             | type = 'mor',
+             | preCombineField = 'name'
+             |) location '$tablePathA'
+             |""".stripMargin)
+
+        spark.sql(s"insert into $targetTableA (id, price, name) values (1, 
12.1, 'aaa')")
+
+        checkAnswer(s"select id, price, name from $targetTableA")(
+          Seq(1, 12.1, "aaa")
+        )
+
+        val targetTableB = generateTableName
+        val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"
+
+        spark.sql(
+          s"""
+             |create table if not exists $targetTableB (
+             | id bigint,
+             | name string,
+             | price double,
+             | day string,
+             | hour string
+             |) using hudi
+             |tblproperties (
+             | primaryKey = 'id',
+             | type = 'mor',
+             | preCombineField = 'name'
+             |) partitioned by (day, hour)
+             |location '$tablePathB'
+             |""".stripMargin)
+
+        spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
+          s"values (2, '01', 12.2, 'bbb', '02')")
+
+        spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
+          s"select id, '01' as dt, price, name, '03' as hour from 
$targetTableA")
+
+        spark.sql(s"insert into $targetTableB partition(day='02', hour) (id, 
hour, price, name) " +
+          s"values (3, '01', 12.3, 'ccc')")
+
+        spark.sql(s"insert into $targetTableB partition(day='02', hour='02') 
(id, price, name) " +
+          s"values (4, 12.4, 'ddd')")
+
+        checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+          Seq(2, 12.2, "bbb", "01", "02"),
+          Seq(1, 12.1, "aaa", "01", "03"),
+          Seq(3, 12.3, "ccc", "02", "01"),
+          Seq(4, 12.4, "ddd", "02", "02")
+        )
+
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = true")
+          checkExceptionContain(s"insert into $targetTableB (id, day, price, 
name, hour) " +
+            s"select id, '01' as dt, price, name, '03' as hour from 
$targetTableA")(
+            "hudi not support specified cols when enable default columns")
+        }
+      }
+    }
+  }
+
+  test("Test insert overwrite with special cols") {
+    withTempDir { tmp =>
+      if (HoodieSparkUtils.gteqSpark3_2) {
+        val targetTableA = generateTableName
+        val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = false")
+        }
+
+        spark.sql(
+          s"""
+             |create table if not exists $targetTableA (
+             | id bigint,
+             | name string,
+             | price double
+             |) using hudi
+             |tblproperties (
+             | primaryKey = 'id',
+             | type = 'mor',
+             | preCombineField = 'name'
+             |) location '$tablePathA'
+             |""".stripMargin)
+
+        spark.sql(s"insert overwrite $targetTableA (id, price, name) values 
(1, 12.1, 'aaa')")
+
+        checkAnswer(s"select id, price, name from $targetTableA")(
+          Seq(1, 12.1, "aaa")
+        )
+
+        val targetTableB = generateTableName
+        val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"
+
+        spark.sql(
+          s"""
+             |create table if not exists $targetTableB (
+             | id bigint,
+             | name string,
+             | price double,
+             | day string,
+             | hour string
+             |) using hudi
+             |tblproperties (
+             | primaryKey = 'id',
+             | type = 'mor',
+             | preCombineField = 'name'
+             |) partitioned by (day, hour)
+             |location '$tablePathB'
+             |""".stripMargin)
+
+        spark.sql(s"insert overwrite $targetTableB (id, day, price, name, 
hour) " +
+          s"values (2, '01', 12.2, 'bbb', '02')")
+
+        checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+          Seq(2, 12.2, "bbb", "01", "02")
+        )
+
+        spark.sql(s"insert overwrite $targetTableB (id, day, price, name, 
hour) " +
+          s"select id, '01' as dt, price, name, '03' as hour from 
$targetTableA")
+
+        spark.sql(s"insert overwrite $targetTableB partition(day='02', hour) 
(id, hour, price, name) " +
+          s"values (3, '01', 12.3, 'ccc')")
+
+        spark.sql(s"insert overwrite $targetTableB partition(day='02', 
hour='02') (id, price, name) " +
+          s"values (4, 12.4, 'ddd')")
+
+        checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+          Seq(1, 12.1, "aaa", "01", "03"),
+          Seq(3, 12.3, "ccc", "02", "01"),
+          Seq(4, 12.4, "ddd", "02", "02")
+        )
+
+        if (HoodieSparkUtils.isSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled = true")
+          checkExceptionContain(s"insert overwrite $targetTableB (id, day, 
price, name, hour) " +
+            s"select id, '01' as dt, price, name, '03' as hour from 
$targetTableA")(
+            "hudi not support specified cols when enable default columns")
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 9f3a5ce03a8..c7ed7928f05 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -61,10 +61,10 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
     Join(left, right, joinType, None)
   }
 
-  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, 
Boolean)] = {
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
     plan match {
       case InsertIntoTable(table, partition, query, overwrite, 
ifPartitionNotExists) =>
-        Some((table, partition, query, overwrite, ifPartitionNotExists))
+        Some((table, Seq.empty, partition, query, overwrite, 
ifPartitionNotExists))
       case _ => None
     }
   }
@@ -129,4 +129,6 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
   override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, 
Seq[Attribute])] = None
 
   override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String)] = None
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = None
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index a0938b94671..4faf1437b52 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, J
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
 import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, 
ExplainCommand}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
@@ -56,15 +57,6 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
     Join(left, right, joinType, None, JoinHint.NONE)
   }
 
-  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, 
Boolean)] = {
-    plan match {
-      case insert: InsertIntoStatement =>
-        Some((insert.table, insert.partitionSpec, insert.query, 
insert.overwrite, insert.ifPartitionNotExists))
-      case _ =>
-        None
-    }
-  }
-
 
   override def unapplyCreateTableLikeCommand(plan: LogicalPlan): 
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], 
Map[String, String], Boolean)] = {
     plan match {
@@ -84,6 +76,8 @@ trait HoodieSpark3CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
   override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
     a.sameOutput(b)
   }
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = None
 }
 
 object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index dbe68bb33e5..1ea04b512e6 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, 
LogicalPlan, MergeIntoTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
@@ -82,4 +82,13 @@ object HoodieSpark30CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, 
Seq[Attribute])] = None
 
   override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String)] = None
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, 
insert.overwrite, insert.ifPartitionNotExists))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 765a9b06de5..7462d41d299 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,12 +18,11 @@
 
 package org.apache.spark.sql
 
-import org.apache.hudi.SparkHoodieTableFileIndex
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, 
LogicalPlan, MergeIntoTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
@@ -83,4 +82,13 @@ object HoodieSpark31CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, 
Seq[Attribute])] = None
 
   override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, 
String)] = None
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, 
insert.overwrite, insert.ifPartitionNotExists))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 8562ca14d3e..2f147b89b80 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -127,4 +127,22 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
         None
     }
   }
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, 
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+      case _ =>
+        None
+    }
+  }
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
 insert))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
new file mode 100644
index 00000000000..868e225476d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, 
LogicalPlan, Project}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * this class is similar with spark::ResolveInsertionBase in spark3.4
+ */
+object ResolveInsertionBase extends SparkAdapterSupport {
+  def resolver: Resolver = SQLConf.get.resolver
+
+  /** Add a project to use the table column names for INSERT INTO BY NAME */
+  def createProjectForByNameQuery(tblName: String,
+                                  i: InsertIntoStatement): LogicalPlan = {
+    sparkAdapter.getSchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols,
+      "in the table definition of " + tblName, 
SQLConf.get.caseSensitiveAnalysis)
+
+    if (i.userSpecifiedCols.size != i.query.output.size) {
+      if (i.userSpecifiedCols.size > i.query.output.size) {
+        throw cannotWriteNotEnoughColumnsToTableError(
+          tblName, i.userSpecifiedCols, i.query.output)
+      } else {
+        throw cannotWriteNotEnoughColumnsToTableError(
+          tblName, i.userSpecifiedCols, i.query.output)
+      }
+    }
+    val projectByName = i.userSpecifiedCols.zip(i.query.output)
+      .map { case (userSpecifiedCol, queryOutputCol) =>
+        val resolvedCol = i.table.resolve(Seq(userSpecifiedCol), resolver)
+          .getOrElse(
+            throw unresolvedAttributeError(
+              "UNRESOLVED_COLUMN", userSpecifiedCol, 
i.table.output.map(_.name)))
+        (queryOutputCol.dataType, resolvedCol.dataType) match {
+          case (input: StructType, expected: StructType) =>
+            // Rename inner fields of the input column to pass the by-name 
INSERT analysis.
+            Alias(Cast(queryOutputCol, renameFieldsInStruct(input, expected)), 
resolvedCol.name)()
+          case _ =>
+            Alias(queryOutputCol, resolvedCol.name)()
+        }
+      }
+    Project(projectByName, i.query)
+  }
+
+  private def renameFieldsInStruct(input: StructType, expected: StructType): 
StructType = {
+    if (input.length == expected.length) {
+      val newFields = input.zip(expected).map { case (f1, f2) =>
+        (f1.dataType, f2.dataType) match {
+          case (s1: StructType, s2: StructType) =>
+            f1.copy(name = f2.name, dataType = renameFieldsInStruct(s1, s2))
+          case _ =>
+            f1.copy(name = f2.name)
+        }
+      }
+      StructType(newFields)
+    } else {
+      input
+    }
+  }
+
+  def cannotWriteNotEnoughColumnsToTableError(tableName: String,
+                                              expected: Seq[String],
+                                              queryOutput: Seq[Attribute]): 
Throwable = {
+    new AnalysisException("table: " + tableName + 
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS, expect: " +
+      expected.mkString(",") + " queryOutput: " + queryOutput.map(attr => 
attr.name).mkString(","))
+  }
+
+  def unresolvedAttributeError(errorClass: String,
+                               colName: String,
+                               candidates: Seq[String]): Throwable = {
+    new AnalysisException(errorClass + " expect col is : " +
+      colName + " table output is : " + candidates.mkString(","))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 2d9250bbe9e..651059212cf 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -111,4 +111,22 @@ object HoodieSpark33CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
         None
     }
   }
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, 
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+      case _ =>
+        None
+    }
+  }
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
 insert))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 2c50d21cbe5..890a7cc821d 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
Table, TableCatalog}
 import org.apache.spark.sql.execution.command.RepairTableCommand
 import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, 
ParquetFileFormat}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -115,4 +116,35 @@ object HoodieSpark34CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
         None
     }
   }
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        // https://github.com/apache/spark/pull/36077
+        // first: in this pr, spark34 support default value for insert into, 
it will regenerate the user specified cols
+        //        so, no need deal with it in hudi side
+        // second: in this pr, it will append hoodie meta field with default 
value, has some bug, it look like be fixed
+        //         in spark35(https://github.com/apache/spark/pull/41262), so 
if user want specified cols, need disable default feature.
+        if (SQLConf.get.enableDefaultColumns) {
+          if (insert.userSpecifiedCols.nonEmpty) {
+            throw new AnalysisException("hudi not support specified cols when 
enable default columns, " +
+              "please disable 'spark.sql.defaultColumn.enabled'")
+          }
+          Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, 
insert.overwrite, insert.ifPartitionNotExists))
+        } else {
+          Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, 
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+        }
+      case _ =>
+        None
+    }
+  }
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
 insert))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index b95ee94e482..22316ddacac 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -115,4 +115,22 @@ object HoodieSpark35CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
         None
     }
   }
+
+  override def unapplyInsertIntoStatement(plan: LogicalPlan): 
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, 
Boolean, Boolean)] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, 
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+      case _ =>
+        None
+    }
+  }
+
+  override def createProjectForByNameQuery(lr: LogicalRelation, plan: 
LogicalPlan): Option[LogicalPlan] = {
+    plan match {
+      case insert: InsertIntoStatement =>
+        
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
 insert))
+      case _ =>
+        None
+    }
+  }
 }

Reply via email to