This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7db85642600 [SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using
DSv2 sources
7db85642600 is described below
commit 7db85642600b1e3b39ca11e41d4e3e0bf1c8962b
Author: allisonwang-db <[email protected]>
AuthorDate: Mon Dec 11 10:32:48 2023 -0800
[SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using DSv2 sources
### What changes were proposed in this pull request?
This PR adds test cases for INSERT INTO and INSERT OVERWRITE queries with
DSv2 sources.
### Why are the changes needed?
To improve test coverage
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44213 from allisonwang-db/spark-46273-dsv2-insert.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/errors/QueryCompilationErrors.scala | 4 +-
.../datasources/v2/TableCapabilityCheck.scala | 2 +-
.../spark/sql/connector/DataSourceV2Suite.scala | 127 ++++++++++++++++++++-
3 files changed, 129 insertions(+), 4 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b7e10dc194a..1195e9dd78d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
unsupportedTableOperationError(table.name(), "truncate in batch mode")
}
- def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
- unsupportedTableOperationError(table.name(), "overwrite by filter in batch
mode")
+ def unsupportedOverwriteByFilterInBatchModeError(name: String): Throwable = {
+ unsupportedTableOperationError(name, "overwrite by filter in batch mode")
}
def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String):
Throwable = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index a3afaa36ab9..b1a93addc80 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -63,7 +63,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
case _ =>
if (!supportsBatchWrite(r.table) ||
!r.table.supports(OVERWRITE_BY_FILTER)) {
throw
QueryCompilationErrors.unsupportedOverwriteByFilterInBatchModeError(
- r.table)
+ r.name)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index f2e518e8acc..6e365e1d605 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector
import java.io.File
+import java.util
import java.util.OptionalLong
import test.org.apache.spark.sql.connector._
@@ -723,8 +724,116 @@ class DataSourceV2Suite extends QueryTest with
SharedSparkSession with AdaptiveS
}
}
}
-}
+ test("SPARK-46273: insert into") {
+ val cls = classOf[SupportsExternalMetadataDataSource]
+ withTable("test") {
+ sql(
+ s"""
+ |CREATE TABLE test (x INT, y INT) USING ${cls.getName}
+ |""".stripMargin)
+ sql("INSERT INTO test VALUES (1, 2)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2)))
+ // Insert by name
+ sql("INSERT INTO test(y, x) VALUES (3, 2)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3)))
+ // Can be casted automatically
+ sql("INSERT INTO test(y, x) VALUES (4L, 3L)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3,
4)))
+ // Insert values by name
+ sql("INSERT INTO test BY NAME VALUES (5, 4) t(y, x)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3,
4), Row(4, 5)))
+ // Missing columns
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT INTO test VALUES (4)")
+ },
+ errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> "`spark_catalog`.`default`.`test`",
+ "tableColumns" -> "`x`, `y`",
+ "dataColumns" -> "`col1`"
+ )
+ )
+ // Duplicate columns
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"INSERT INTO test(x, x) VALUES (4, 5)")
+ },
+ errorClass = "COLUMN_ALREADY_EXISTS",
+ parameters = Map("columnName" -> "`x`"))
+ }
+ }
+
+ test("SPARK-46273: insert overwrite") {
+ val cls = classOf[SupportsExternalMetadataDataSource]
+ withTable("test") {
+ sql(
+ s"""
+ |CREATE TABLE test USING ${cls.getName}
+ |AS VALUES (0, 1), (1, 2) t(x, y)
+ |""".stripMargin)
+ sql(
+ s"""
+ |INSERT OVERWRITE test VALUES (2, 3), (3, 4)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(3, 4)))
+ // Insert overwrite by name
+ sql("INSERT OVERWRITE test(y, x) VALUES (3, 2)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3)))
+ // Can be casted automatically
+ sql("INSERT OVERWRITE test(y, x) VALUES (4L, 3L)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4)))
+ }
+ }
+
+ test("SPARK-46273: insert into with partition") {
+ val cls = classOf[SupportsExternalMetadataDataSource]
+ withTable("test") {
+ sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED
BY (x, y)")
+ sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
+ sql("INSERT INTO test PARTITION(x = 2, y) VALUES (3)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3)))
+ sql("INSERT INTO test PARTITION(y, x = 3) VALUES (4)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3,
4)))
+ sql("INSERT INTO test PARTITION(x, y) VALUES (4, 5)")
+ checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3,
4), Row(4, 5)))
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("INSERT INTO test PARTITION(z = 1) VALUES (2)")
+ },
+ errorClass = "NON_PARTITION_COLUMN",
+ parameters = Map("columnName" -> "`z`"))
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("INSERT INTO test PARTITION(x, y = 1) VALUES (2, 3)")
+ },
+ errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+ parameters = Map(
+ "tableName" -> "`spark_catalog`.`default`.`test`",
+ "tableColumns" -> "`x`, `y`",
+ "dataColumns" -> "`col1`, `y`, `col2`")
+ )
+ }
+ }
+
+ test("SPARK-46273: insert overwrite with partition") {
+ val cls = classOf[SupportsExternalMetadataDataSource]
+ withTable("test") {
+ sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED
BY (x, y)")
+ sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("INSERT OVERWRITE test PARTITION(x = 1) VALUES (5)")
+ },
+ errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+ parameters = Map(
+ "tableName" -> "`spark_catalog`.`default`.`test`",
+ "operation" -> "overwrite by filter in batch mode")
+ )
+ }
+ }
+}
case class RangeInputPartition(start: Int, end: Int) extends InputPartition
@@ -1227,6 +1336,22 @@ class SimpleWriteOnlyDataSource extends
SimpleWritableDataSource {
class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
override def supportsExternalMetadata(): Boolean = true
+
+ class TestTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ options: CaseInsensitiveStringMap) extends MyTable(options) {
+ override def schema(): StructType = schema
+
+ override def partitioning(): Array[Transform] = partitioning
+ }
+
+ override def getTable(
+ schema: StructType,
+ partitioning: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ new TestTable(schema, partitioning, new
CaseInsensitiveStringMap(properties))
+ }
}
class SupportsExternalMetadataWritableDataSource extends
SimpleWritableDataSource {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]