This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 97e0a88b1d04 [SPARK-49359][SQL] Allow StagedTableCatalog
implementations to fall back to non-atomic write
97e0a88b1d04 is described below
commit 97e0a88b1d04a8e33206e5cb8384878ca4cdfc5a
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Aug 23 10:49:39 2024 +0900
[SPARK-49359][SQL] Allow StagedTableCatalog implementations to fall back to
non-atomic write
### What changes were proposed in this pull request?
This PR allows `StagedTableCatalog#create/replaceTable` to return null and
Spark will fall back to normal non-atomic write.
### Why are the changes needed?
Extending an interface is static but sometimes the implementations need
more dynamicity. For example, a catalog may only support atomic CTAS for
certain table formats, and we shouldn't force them to implement atomic writes
for all other formats.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #47848 from cloud-fan/stage.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connector/catalog/StagingTableCatalog.java | 9 ++-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 7 ++-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 72 ++++++++++++++++++++++
3 files changed, 83 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index a8e1757a492d..50643b9cbe32 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -80,7 +80,8 @@ public interface StagingTableCatalog extends TableCatalog {
* @param columns the column of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
- * @return metadata for the new table
+ * @return metadata for the new table. This can be null if the catalog does
not support atomic
+ * creation for this table. Spark will call {@link
#loadTable(Identifier)} later.
* @throws TableAlreadyExistsException If a table or view already exists for
the identifier
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
@@ -128,7 +129,8 @@ public interface StagingTableCatalog extends TableCatalog {
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
- * @return metadata for the new table
+ * @return metadata for the new table. This can be null if the catalog does
not support atomic
+ * creation for this table. Spark will call {@link
#loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
* @throws NoSuchTableException If the table does not exist
@@ -176,7 +178,8 @@ public interface StagingTableCatalog extends TableCatalog {
* @param columns the columns of the new table
* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
- * @return metadata for the new table
+ * @return metadata for the new table. This can be null if the catalog does
not support atomic
+ * creation for this table. Spark will call {@link
#loadTable(Identifier)} later.
* @throws UnsupportedOperationException If a requested partition transform
is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
*/
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 89372017257d..5885ec0afadc 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -117,9 +117,10 @@ case class AtomicCreateTableAsSelectExec(
}
throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
- val stagedTable = catalog.stageCreate(
+ val stagedTable = Option(catalog.stageCreate(
ident, getV2Columns(query.schema, catalog.useNullableQuerySchema),
partitioning.toArray, properties.asJava)
+ ).getOrElse(catalog.loadTable(ident,
Set(TableWritePrivilege.INSERT).asJava))
writeToTable(catalog, stagedTable, writeOptions, ident, query)
}
}
@@ -216,7 +217,9 @@ case class AtomicReplaceTableAsSelectExec(
} else {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
- writeToTable(catalog, staged, writeOptions, ident, query)
+ val table = Option(staged).getOrElse(
+ catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava))
+ writeToTable(catalog, table, writeOptions, ident, query)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index a63eaddc2206..de398c49a0eb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3770,6 +3770,19 @@ class DataSourceV2SQLSuiteV1Filter
checkWriteOperations("read_only_cat")
}
+ test("StagingTableCatalog without atomic support") {
+ withSQLConf("spark.sql.catalog.fakeStagedCat" ->
classOf[FakeStagedTableCatalog].getName) {
+ withTable("fakeStagedCat.t") {
+ sql("CREATE TABLE fakeStagedCat.t AS SELECT 1 col")
+ checkAnswer(spark.table("fakeStagedCat.t"), Row(1))
+ sql("REPLACE TABLE fakeStagedCat.t AS SELECT 2 col")
+ checkAnswer(spark.table("fakeStagedCat.t"), Row(2))
+ sql("CREATE OR REPLACE TABLE fakeStagedCat.t AS SELECT 1 c1, 2 c2")
+ checkAnswer(spark.table("fakeStagedCat.t"), Row(1, 2))
+ }
+ }
+ }
+
private def testNotSupportedV2Command(
sqlCommand: String,
sqlParams: String,
@@ -3855,3 +3868,62 @@ class ReadOnlyCatalog extends InMemoryCatalog {
writePrivileges.asScala.toSeq.map(_.toString).sorted.mkString(","))
}
}
+
+class FakeStagedTableCatalog extends InMemoryCatalog with StagingTableCatalog {
+ override def stageCreate(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ throw new RuntimeException("shouldn't be called")
+ }
+
+ override def stageCreate(
+ ident: Identifier,
+ columns: Array[ColumnV2],
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ super.createTable(ident, columns, partitions, properties)
+ null
+ }
+
+ override def stageReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ throw new RuntimeException("shouldn't be called")
+ }
+
+ override def stageReplace(
+ ident: Identifier,
+ columns: Array[ColumnV2],
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ super.dropTable(ident)
+ super.createTable(ident, columns, partitions, properties)
+ null
+ }
+
+ override def stageCreateOrReplace(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ throw new RuntimeException("shouldn't be called")
+ }
+
+ override def stageCreateOrReplace(
+ ident: Identifier,
+ columns: Array[ColumnV2],
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): StagedTable = {
+ try {
+ super.dropTable(ident)
+ } catch {
+ case _: Throwable =>
+ }
+ super.createTable(ident, columns, partitions, properties)
+ null
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]