This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ef83379d38a [HUDI-7949] insert into hudi table with columns specified
(#11568)
ef83379d38a is described below
commit ef83379d38a08fc87fe567cbbf321fecb47fc80b
Author: KnightChess <[email protected]>
AuthorDate: Mon Jul 8 08:31:18 2024 +0800
[HUDI-7949] insert into hudi table with columns specified (#11568)
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 10 +-
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 20 ++-
.../spark/sql/hudi/dml/TestInsertTable.scala | 159 +++++++++++++++++++++
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 6 +-
.../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark30CatalystPlanUtils.scala | 11 +-
.../spark/sql/HoodieSpark31CatalystPlanUtils.scala | 12 +-
.../spark/sql/HoodieSpark32CatalystPlanUtils.scala | 18 +++
.../apache/spark/sql/ResolveInsertionBase.scala | 95 ++++++++++++
.../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 18 +++
.../spark/sql/HoodieSpark34CatalystPlanUtils.scala | 32 +++++
.../spark/sql/HoodieSpark35CatalystPlanUtils.scala | 18 +++
12 files changed, 389 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index 40e62ddd0ef..42dcc02cca4 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -22,6 +22,7 @@ import
org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
trait HoodieCatalystPlansUtils {
@@ -111,8 +112,10 @@ trait HoodieCatalystPlansUtils {
/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to
accommodate for API
* changes in Spark 3.3
+ * @return a option tuple with (table logical plan, userSpecifiedCols,
partitionSpec, query, overwrite, ifPartitionNotExists)
+ * userSpecifiedCols: only than the version of Spark32 will return,
other is empty
*/
- def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan,
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
+ def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan,
Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
/**
* Decomposes [[CreateTableLikeCommand]] into its arguments allowing to
accommodate for API
@@ -147,4 +150,9 @@ trait HoodieCatalystPlansUtils {
* true if both plans produce the same attributes in the same order
*/
def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean
+
+ /**
+ * Add a project to use the table column names for INSERT INTO BY NAME with
specified cols
+ */
+ def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan):
Option[LogicalPlan]
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index aef98b4e91d..e6c7b8c1ab0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.hudi.analysis
-import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils}
import org.apache.hudi.common.util.ReflectionUtils.loadClass
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
-
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
@@ -37,7 +36,6 @@ import
org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure
import org.apache.spark.sql.{AnalysisException, SparkSession}
import java.util
-
import scala.collection.mutable.ListBuffer
object HoodieAnalysis extends SparkAdapterSupport {
@@ -252,7 +250,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
// NOTE: In case of [[InsertIntoStatement]] Hudi tables could be on
both sides -- receiving and providing
// the data, as such we have to make sure that we handle both
of these cases
- case iis @ MatchInsertIntoStatement(targetTable, _, query, _, _) =>
+ case iis @ MatchInsertIntoStatement(targetTable, _, _, query, _, _)
=>
val updatedTargetTable = targetTable match {
// In the receiving side of the IIS, we can't project meta-field
attributes out,
// and instead have to explicitly remove them
@@ -355,7 +353,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
}
private[sql] object MatchInsertIntoStatement {
- def unapply(plan: LogicalPlan): Option[(LogicalPlan, Map[String,
Option[String]], LogicalPlan, Boolean, Boolean)] =
+ def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[String],
Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] =
sparkAdapter.getCatalystPlanUtils.unapplyInsertIntoStatement(plan)
}
@@ -408,12 +406,20 @@ case class ResolveImplementationsEarly() extends
Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Convert to InsertIntoHoodieTableCommand
- case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_),
partition, query, overwrite, _) if query.resolved =>
+ case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_),
userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
relation match {
// NOTE: In Spark >= 3.2, Hudi relations will be resolved as
[[DataSourceV2Relation]]s by default;
// However, currently, fallback will be applied downgrading
them to V1 relations, hence
// we need to check whether we could proceed here, or has to
wait until fallback rule kicks in
- case lr: LogicalRelation => new InsertIntoHoodieTableCommand(lr,
query, partition, overwrite)
+ case lr: LogicalRelation =>
+ // Create a project if this is an INSERT INTO query with specified
cols.
+ val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
+ ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing
catalog table")
+
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
+ } else {
+ None
+ }
+ new InsertIntoHoodieTableCommand(lr,
projectByUserSpecified.getOrElse(query), partition, overwrite)
case _ => iis
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 0b1d4ca8999..f53db8e5819 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2826,4 +2826,163 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
}
+
+ test("Test insert into with special cols") {
+ withTempDir { tmp =>
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ val targetTableA = generateTableName
+ val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = false")
+ }
+
+ spark.sql(
+ s"""
+ |create table if not exists $targetTableA (
+ | id bigint,
+ | name string,
+ | price double
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'name'
+ |) location '$tablePathA'
+ |""".stripMargin)
+
+ spark.sql(s"insert into $targetTableA (id, price, name) values (1,
12.1, 'aaa')")
+
+ checkAnswer(s"select id, price, name from $targetTableA")(
+ Seq(1, 12.1, "aaa")
+ )
+
+ val targetTableB = generateTableName
+ val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"
+
+ spark.sql(
+ s"""
+ |create table if not exists $targetTableB (
+ | id bigint,
+ | name string,
+ | price double,
+ | day string,
+ | hour string
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'name'
+ |) partitioned by (day, hour)
+ |location '$tablePathB'
+ |""".stripMargin)
+
+ spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
+ s"values (2, '01', 12.2, 'bbb', '02')")
+
+ spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
+ s"select id, '01' as dt, price, name, '03' as hour from
$targetTableA")
+
+ spark.sql(s"insert into $targetTableB partition(day='02', hour) (id,
hour, price, name) " +
+ s"values (3, '01', 12.3, 'ccc')")
+
+ spark.sql(s"insert into $targetTableB partition(day='02', hour='02')
(id, price, name) " +
+ s"values (4, 12.4, 'ddd')")
+
+ checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+ Seq(2, 12.2, "bbb", "01", "02"),
+ Seq(1, 12.1, "aaa", "01", "03"),
+ Seq(3, 12.3, "ccc", "02", "01"),
+ Seq(4, 12.4, "ddd", "02", "02")
+ )
+
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = true")
+ checkExceptionContain(s"insert into $targetTableB (id, day, price,
name, hour) " +
+ s"select id, '01' as dt, price, name, '03' as hour from
$targetTableA")(
+ "hudi not support specified cols when enable default columns")
+ }
+ }
+ }
+ }
+
+ test("Test insert overwrite with special cols") {
+ withTempDir { tmp =>
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ val targetTableA = generateTableName
+ val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = false")
+ }
+
+ spark.sql(
+ s"""
+ |create table if not exists $targetTableA (
+ | id bigint,
+ | name string,
+ | price double
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'name'
+ |) location '$tablePathA'
+ |""".stripMargin)
+
+ spark.sql(s"insert overwrite $targetTableA (id, price, name) values
(1, 12.1, 'aaa')")
+
+ checkAnswer(s"select id, price, name from $targetTableA")(
+ Seq(1, 12.1, "aaa")
+ )
+
+ val targetTableB = generateTableName
+ val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"
+
+ spark.sql(
+ s"""
+ |create table if not exists $targetTableB (
+ | id bigint,
+ | name string,
+ | price double,
+ | day string,
+ | hour string
+ |) using hudi
+ |tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'name'
+ |) partitioned by (day, hour)
+ |location '$tablePathB'
+ |""".stripMargin)
+
+ spark.sql(s"insert overwrite $targetTableB (id, day, price, name,
hour) " +
+ s"values (2, '01', 12.2, 'bbb', '02')")
+
+ checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+ Seq(2, 12.2, "bbb", "01", "02")
+ )
+
+ spark.sql(s"insert overwrite $targetTableB (id, day, price, name,
hour) " +
+ s"select id, '01' as dt, price, name, '03' as hour from
$targetTableA")
+
+ spark.sql(s"insert overwrite $targetTableB partition(day='02', hour)
(id, hour, price, name) " +
+ s"values (3, '01', 12.3, 'ccc')")
+
+ spark.sql(s"insert overwrite $targetTableB partition(day='02',
hour='02') (id, price, name) " +
+ s"values (4, 12.4, 'ddd')")
+
+ checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
+ Seq(1, 12.1, "aaa", "01", "03"),
+ Seq(3, 12.3, "ccc", "02", "01"),
+ Seq(4, 12.4, "ddd", "02", "02")
+ )
+
+ if (HoodieSparkUtils.isSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled = true")
+ checkExceptionContain(s"insert overwrite $targetTableB (id, day,
price, name, hour) " +
+ s"select id, '01' as dt, price, name, '03' as hour from
$targetTableA")(
+ "hudi not support specified cols when enable default columns")
+ }
+ }
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 9f3a5ce03a8..c7ed7928f05 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -61,10 +61,10 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
Join(left, right, joinType, None)
}
- override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean,
Boolean)] = {
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
plan match {
case InsertIntoTable(table, partition, query, overwrite,
ifPartitionNotExists) =>
- Some((table, partition, query, overwrite, ifPartitionNotExists))
+ Some((table, Seq.empty, partition, query, overwrite,
ifPartitionNotExists))
case _ => None
}
}
@@ -129,4 +129,6 @@ object HoodieSpark2CatalystPlanUtils extends
HoodieCatalystPlansUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = None
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index a0938b94671..4faf1437b52 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -27,6 +27,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, J
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.execution.command.{CreateTableLikeCommand,
ExplainCommand}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -56,15 +57,6 @@ trait HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
Join(left, right, joinType, None, JoinHint.NONE)
}
- override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean,
Boolean)] = {
- plan match {
- case insert: InsertIntoStatement =>
- Some((insert.table, insert.partitionSpec, insert.query,
insert.overwrite, insert.ifPartitionNotExists))
- case _ =>
- None
- }
- }
-
override def unapplyCreateTableLikeCommand(plan: LogicalPlan):
Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String],
Map[String, String], Boolean)] = {
plan match {
@@ -84,6 +76,8 @@ trait HoodieSpark3CatalystPlanUtils extends
HoodieCatalystPlansUtils {
override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.sameOutput(b)
}
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = None
}
object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
diff --git
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
index dbe68bb33e5..1ea04b512e6 100644
---
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement,
LogicalPlan, MergeIntoTable}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
@@ -82,4 +82,13 @@ object HoodieSpark30CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ Some((insert.table, Seq.empty, insert.partitionSpec, insert.query,
insert.overwrite, insert.ifPartitionNotExists))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 765a9b06de5..7462d41d299 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,12 +18,11 @@
package org.apache.spark.sql
-import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement,
LogicalPlan, MergeIntoTable}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
@@ -83,4 +82,13 @@ object HoodieSpark31CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan,
Seq[Attribute])] = None
override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan,
String)] = None
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ Some((insert.table, Seq.empty, insert.partitionSpec, insert.query,
insert.overwrite, insert.ifPartitionNotExists))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 8562ca14d3e..2f147b89b80 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -127,4 +127,22 @@ object HoodieSpark32CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
None
}
}
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec,
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
insert))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
new file mode 100644
index 00000000000..868e225476d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement,
LogicalPlan, Project}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * this class is similar with spark::ResolveInsertionBase in spark3.4
+ */
+object ResolveInsertionBase extends SparkAdapterSupport {
+ def resolver: Resolver = SQLConf.get.resolver
+
+ /** Add a project to use the table column names for INSERT INTO BY NAME */
+ def createProjectForByNameQuery(tblName: String,
+ i: InsertIntoStatement): LogicalPlan = {
+ sparkAdapter.getSchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols,
+ "in the table definition of " + tblName,
SQLConf.get.caseSensitiveAnalysis)
+
+ if (i.userSpecifiedCols.size != i.query.output.size) {
+ if (i.userSpecifiedCols.size > i.query.output.size) {
+ throw cannotWriteNotEnoughColumnsToTableError(
+ tblName, i.userSpecifiedCols, i.query.output)
+ } else {
+ throw cannotWriteNotEnoughColumnsToTableError(
+ tblName, i.userSpecifiedCols, i.query.output)
+ }
+ }
+ val projectByName = i.userSpecifiedCols.zip(i.query.output)
+ .map { case (userSpecifiedCol, queryOutputCol) =>
+ val resolvedCol = i.table.resolve(Seq(userSpecifiedCol), resolver)
+ .getOrElse(
+ throw unresolvedAttributeError(
+ "UNRESOLVED_COLUMN", userSpecifiedCol,
i.table.output.map(_.name)))
+ (queryOutputCol.dataType, resolvedCol.dataType) match {
+ case (input: StructType, expected: StructType) =>
+ // Rename inner fields of the input column to pass the by-name
INSERT analysis.
+ Alias(Cast(queryOutputCol, renameFieldsInStruct(input, expected)),
resolvedCol.name)()
+ case _ =>
+ Alias(queryOutputCol, resolvedCol.name)()
+ }
+ }
+ Project(projectByName, i.query)
+ }
+
+ private def renameFieldsInStruct(input: StructType, expected: StructType):
StructType = {
+ if (input.length == expected.length) {
+ val newFields = input.zip(expected).map { case (f1, f2) =>
+ (f1.dataType, f2.dataType) match {
+ case (s1: StructType, s2: StructType) =>
+ f1.copy(name = f2.name, dataType = renameFieldsInStruct(s1, s2))
+ case _ =>
+ f1.copy(name = f2.name)
+ }
+ }
+ StructType(newFields)
+ } else {
+ input
+ }
+ }
+
+ def cannotWriteNotEnoughColumnsToTableError(tableName: String,
+ expected: Seq[String],
+ queryOutput: Seq[Attribute]):
Throwable = {
+ new AnalysisException("table: " + tableName +
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS, expect: " +
+ expected.mkString(",") + " queryOutput: " + queryOutput.map(attr =>
attr.name).mkString(","))
+ }
+
+ def unresolvedAttributeError(errorClass: String,
+ colName: String,
+ candidates: Seq[String]): Throwable = {
+ new AnalysisException(errorClass + " expect col is : " +
+ colName + " table output is : " + candidates.mkString(","))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 2d9250bbe9e..651059212cf 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -111,4 +111,22 @@ object HoodieSpark33CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
None
}
}
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec,
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
insert))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
index 2c50d21cbe5..890a7cc821d 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier,
Table, TableCatalog}
import org.apache.spark.sql.execution.command.RepairTableCommand
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait,
ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -115,4 +116,35 @@ object HoodieSpark34CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
None
}
}
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ // https://github.com/apache/spark/pull/36077
+ // first: in this pr, spark34 support default value for insert into,
it will regenerate the user specified cols
+ // so, no need deal with it in hudi side
+ // second: in this pr, it will append hoodie meta field with default
value, has some bug, it look like be fixed
+ // in spark35(https://github.com/apache/spark/pull/41262), so
if user want specified cols, need disable default feature.
+ if (SQLConf.get.enableDefaultColumns) {
+ if (insert.userSpecifiedCols.nonEmpty) {
+ throw new AnalysisException("hudi not support specified cols when
enable default columns, " +
+ "please disable 'spark.sql.defaultColumn.enabled'")
+ }
+ Some((insert.table, Seq.empty, insert.partitionSpec, insert.query,
insert.overwrite, insert.ifPartitionNotExists))
+ } else {
+ Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec,
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+ }
+ case _ =>
+ None
+ }
+ }
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
insert))
+ case _ =>
+ None
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
index b95ee94e482..22316ddacac 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala
@@ -115,4 +115,22 @@ object HoodieSpark35CatalystPlanUtils extends
HoodieSpark3CatalystPlanUtils {
None
}
}
+
+ override def unapplyInsertIntoStatement(plan: LogicalPlan):
Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan,
Boolean, Boolean)] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+ Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec,
insert.query, insert.overwrite, insert.ifPartitionNotExists))
+ case _ =>
+ None
+ }
+ }
+
+ override def createProjectForByNameQuery(lr: LogicalRelation, plan:
LogicalPlan): Option[LogicalPlan] = {
+ plan match {
+ case insert: InsertIntoStatement =>
+
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName,
insert))
+ case _ =>
+ None
+ }
+ }
}