jonvex commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1927043420
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
Review Comment:
Permalink for future reference:
https://github.com/apache/spark/blob/d061aadf25fd258d2d3e7332a489c9c24a2b5530/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L373
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -43,6 +43,122 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
class TestInsertTable extends HoodieSparkSqlTestBase {
+ test("Test Insert Into with subset of columns") {
+ // This is only supported by Spark 3.4 and above
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ Seq("cow", "mor").foreach(tableType =>
+ Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+ testInsertIntoWithSubsetOfColumns(
+ "hudi", tableType, s"${tmp.getCanonicalPath}/hudi_table",
isPartitioned)
+ }))
+ }
+ }
+
+ test("Test Insert Into with subset of columns on Parquet table") {
+ // This is only supported by Spark 3.4 and above
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ // Make sure parquet tables are not affected by the custom rules for
+ // INSERT INTO statements on Hudi tables
+ Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+ testInsertIntoWithSubsetOfColumns(
+ "parquet", "", s"${tmp.getCanonicalPath}/parquet_table",
isPartitioned)
+ })
+ }
+ }
+
+ private def testInsertIntoWithSubsetOfColumns(format: String,
+ tableType: String,
+ tablePath: String,
+ isPartitioned: Boolean): Unit
= {
+ val tableName = generateTableName
+ val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)"
else ""
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using $format
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id'
+ | )
+ | $createTablePartitionClause
+ | location '$tablePath'
+ """.stripMargin)
+
+ // INSERT INTO with all columns
+ // Same ordering of columns as the schema
+ spark.sql(
+ s"""
+ | insert into $tableName (id, name, price, ts, dt)
+ | values (1, 'a1', 10, 1000, '2025-01-01'),
+ | (2, 'a2', 20, 2000, '2025-01-02')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02")
+ )
+
+ // Different ordering of columns compared to the schema
+ spark.sql(
+ s"""
+ | insert into $tableName (dt, name, id, price, ts)
+ | values ('2025-01-03', 'a3', 3, 30, 3000)
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03")
+ )
+
+ // INSERT INTO with a subset of columns
+ // Using different ordering of subset of columns in user-specified columns,
+ // and VALUES without column names
+ spark.sql(
+ s"""
+ | insert into $tableName (dt, ts, name, id)
+ | values ('2025-01-04', 4000, 'a4', 4)
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+ Seq(4, "a4", null, 4000, "2025-01-04")
Review Comment:
What happens if the column is non-nullable?
##########
hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala:
##########
@@ -64,3 +72,132 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
+ case relation: HiveTableRelation =>
+ val metadata = relation.tableMeta
+ preprocess(i, metadata.identifier.quotedString,
metadata.partitionSchema,
+ Some(metadata))
+ case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
+ val tblName =
catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
+ preprocess(i, tblName, h.partitionSchema, catalogTable)
+ case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable)
+ // The two conditions below are adapted to Hudi relations
+ case LogicalRelation(_: EmptyRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable)
+ case LogicalRelation(_: MergeOnReadSnapshotRelation, _,
catalogTable, _) =>
Review Comment:
would it be better to do HoodieBaseRelation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]