yihua commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1926241983
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -46,6 +46,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer()
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ rules += (_ => instantiateKlass(
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
Review Comment:
@KnightChess: If we have all the projection and ordering of columns handled
in `HoodieSpark35ResolveColumnsForInsertInto` resolution rule, we can deprecate
the projection logic in `ResolveImplementationsEarly` for
`InsertIntoStatement`?
```
// Create a project if this is an INSERT INTO query with specified cols.
val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing
catalog table")
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
} else {
None
}
```
in
```
case class ResolveImplementationsEarly() extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Convert to InsertIntoHoodieTableCommand
case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_),
userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
relation match {
// NOTE: In Spark >= 3.2, Hudi relations will be resolved as
[[DataSourceV2Relation]]s by default;
// However, currently, fallback will be applied downgrading
them to V1 relations, hence
// we need to check whether we could proceed here, or has to
wait until fallback rule kicks in
case lr: LogicalRelation =>
// Create a project if this is an INSERT INTO query with
specified cols.
val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing
catalog table")
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
} else {
None
}
new InsertIntoHoodieTableCommand(lr,
projectByUserSpecified.getOrElse(query), partition, overwrite)
case _ => iis
}
```
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
Review Comment:
Originally, I directly called `PreprocessTableInsertion.apply(plan)` to
resolve the columns and add projection for columns not present in the
user-specified columns. Then I noticed that for empty table (`EmptyRelation`
in old read path) or MOR table with log files (`MergeOnReadSnapshotRelation` in
old read path), the plan may not get preprocessed as `PreprocessTableInsertion`
does not recognize Hudi relations. So I have to copy the logic and adapt them
here so INSERT INTO a Hudi table with a subset of columns specified works. But
I'm wondering if I should extract the functionality of projection so it's
easier to maintain (that'll take more time for me to figure out). @KnightChess
wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]