yihua commented on code in PR #12692:
URL: https://github.com/apache/hudi/pull/12692#discussion_r1928345547


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala:
##########
@@ -43,6 +43,122 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 class TestInsertTable extends HoodieSparkSqlTestBase {
 
+  test("Test Insert Into with subset of columns") {
+    // This is only supported by Spark 3.4 and above
+    if (HoodieSparkUtils.gteqSpark3_4) {
+      Seq("cow", "mor").foreach(tableType =>
+        Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+          testInsertIntoWithSubsetOfColumns(
+            "hudi", tableType, s"${tmp.getCanonicalPath}/hudi_table", 
isPartitioned)
+        }))
+    }
+  }
+
+  test("Test Insert Into with subset of columns on Parquet table") {
+    // This is only supported by Spark 3.4 and above
+    if (HoodieSparkUtils.gteqSpark3_4) {
+      // Make sure parquet tables are not affected by the custom rules for
+      // INSERT INTO statements on Hudi tables
+      Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+        testInsertIntoWithSubsetOfColumns(
+          "parquet", "", s"${tmp.getCanonicalPath}/parquet_table", 
isPartitioned)
+      })
+    }
+  }
+
+  private def testInsertIntoWithSubsetOfColumns(format: String,
+                                                tableType: String,
+                                                tablePath: String,
+                                                isPartitioned: Boolean): Unit 
= {
+    val tableName = generateTableName
+    val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)" 
else ""
+    // Create a partitioned table
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  dt string,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using $format
+         | tblproperties (
+         | type = '$tableType',
+         | primaryKey = 'id'
+         | )
+         | $createTablePartitionClause
+         | location '$tablePath'
+       """.stripMargin)
+
+    // INSERT INTO with all columns
+    // Same ordering of columns as the schema
+    spark.sql(
+      s"""
+         | insert into $tableName (id, name, price, ts, dt)
+         | values (1, 'a1', 10, 1000, '2025-01-01'),
+         | (2, 'a2', 20, 2000, '2025-01-02')
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02")
+    )
+
+    // Different ordering of columns compared to the schema
+    spark.sql(
+      s"""
+         | insert into $tableName (dt, name, id, price, ts)
+         | values ('2025-01-03', 'a3', 3, 30, 3000)
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03")
+    )
+
+    // INSERT INTO with a subset of columns
+    // Using different ordering of subset of columns in user-specified columns,
+    // and VALUES without column names
+    spark.sql(
+      s"""
+         | insert into $tableName (dt, ts, name, id)
+         | values ('2025-01-04', 4000, 'a4', 4)
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+      Seq(4, "a4", null, 4000, "2025-01-04")

Review Comment:
   That's going to fail the write, since the values for a non-nullable column 
cannot be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to