yihua opened a new pull request, #12692: URL: https://github.com/apache/hudi/pull/12692
### Change Logs This PR fixes the issue that the INSERT SQL statement with a subset of columns fails on Hudi table in Spark 3.5. The same succeeds in Spark 3.4. In Spark 3.5, the following Resolution rules are removed, `ResolveUserSpecifiedColumns` and `ResolveDefaultColumns` (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]] from https://github.com/apache/spark/pull/41262). The same logic of resolving the user-specified columns and default values, which are required for a subset of columns as user-specified compared to the table schema to work properly, are deferred to `PreprocessTableInsertion` for v1 INSERT. Note that `HoodieAnalysis` intercepts the `InsertIntoStatement` after Spark's built-in Resolution rules are applies, the logic of resolving the user specified columns and default values may no longer be applied. To make INSERT with a subset of columns specified by user to work, the custom resolution rule `HoodieSpark35ResolveColumnsForInsertInto` is added to achieve the same, before converting `InsertIntoStatement` into `InsertIntoHoodieTableCommand`. Here's the behavior different before and after the fix on Spark 3.5, when `InsertIntoStatement` is intercepted Before the fix, query in the relation for `InsertIntoStatement` ``` LocalRelation [col1#128, col2#129, col3#130, col4#131] ``` After the fix, query in the relation for `InsertIntoStatement` ``` Project [id#140, name#139, price#146, ts#147L, dt#137] +- Project [null AS _hoodie_commit_time#141, null AS _hoodie_commit_seqno#142, null AS _hoodie_record_key#143, null AS _hoodie_partition_path#144, null AS _hoodie_file_name#145, id#140, name#139, null AS price#146, cast(ts#138 as bigint) AS ts#147L, dt#137] +- Project [col1#133 AS dt#137, col2#134 AS ts#138, col3#135 AS name#139, col4#136 AS id#140] +- LocalRelation [col1#133, col2#134, col3#135, col4#136] ``` New tests are added in `TestInsertTable`.`"Test Insert Into with subset of columns"` and `"Test Insert Into with subset of columns on Parquet table"`. The test on Hudi table fails before the fix and passes after the fix. Reproducing the failure in Spark 3.5 (this is added as tests in `TestInsertTable`.`"Test Insert Into with subset of columns"` and `"Test Insert Into with subset of columns on Parquet table"`): Create table: ``` create table $tableName ( id int, dt string, name string, price double, ts long ) using hudi tblproperties (primaryKey = 'id') location '/tmp/table' ``` INSERT INTO with a subset of columns ``` insert into $tableName (dt, ts, name, id) values ('2025-01-04', 4000, 'a4', 4) ``` It fails with ``` [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`h1`, the reason is not enough data columns: Table columns: `id`, `name`, `price`, `ts`, `dt`. Data columns: `dt`, `ts`, `name`, `id`. org.apache.spark.sql.AnalysisException: [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to `spark_catalog`.`default`.`h1`, the reason is not enough data columns: Table columns: `id`, `name`, `price`, `ts`, `dt`. Data columns: `dt`, `ts`, `name`, `id`. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotWriteNotEnoughColumnsToTableError(QueryCompilationErrors.scala:2126) at org.apache.spark.sql.catalyst.analysis.TableOutputResolver$.resolveOutputColumns(TableOutputResolver.scala:70) at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns(HoodieSpark3CatalystPlanUtils.scala:51) at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.resolveOutputColumns$(HoodieSpark3CatalystPlanUtils.scala:46) at org.apache.spark.sql.HoodieSpark35CatalystPlanUtils$.resolveOutputColumns(HoodieSpark35CatalystPlanUtils.scala:32) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.coerceQueryOutputColumns(InsertIntoHoodieTableCommand.scala:168) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignQueryOutput(InsertIntoHoodieTableCommand.scala:145) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:99) at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:62) ``` ### Impact _Describe any public API or user-facing feature change or any performance impact._ ### Risk level (write none, low medium or high below) _If medium or high, explain what verification was done to mitigate the risks._ ### Documentation Update _Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none"._ - _The config description must be updated if new configs are added or the default value of the configs are changed_ - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make changes to the website._ ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Change Logs and Impact were stated clearly - [ ] Adequate tests were added if applicable - [ ] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
