This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 42db15229144 [SPARK-56695][SQL][DML] Remove Path Based Table support
in SQL
42db15229144 is described below
commit 42db152291444138d720525931ee82013314a81f
Author: Andreas Chatzistergiou <[email protected]>
AuthorDate: Thu Jun 4 11:03:28 2026 -0700
[SPARK-56695][SQL][DML] Remove Path Based Table support in SQL
### What changes were proposed in this pull request?
Current implementation for path based tables in SQL is partial. We are
removing it for Spark 4.2.
### Why are the changes needed?
The previous [attempt](https://github.com/apache/spark/pull/56039) to add
path-based table support was halted due to concerns. For Spark 4.2 we are
letting the connector handle it.
### Does this PR introduce _any_ user-facing change?
No. Path based support in SQL was not yet released.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude Opus 4.7.
Closes #56316 from andreaschat-db/dsv2TransactionRemoveSQLPathBasedSupport.
Authored-by: Andreas Chatzistergiou <[email protected]>
Signed-off-by: Anton Okolnychyi <[email protected]>
---
.../spark/sql/execution/QueryExecution.scala | 50 +----
...org.apache.spark.sql.sources.DataSourceRegister | 2 -
.../connector/PathBasedTableTransactionSuite.scala | 249 ---------------------
3 files changed, 6 insertions(+), 295 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 65bc57de907b..3ca3032be485 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -22,7 +22,6 @@ import java.util.UUID
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import javax.annotation.concurrent.GuardedBy
-import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
@@ -32,22 +31,21 @@ import
org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer,
EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery,
UnresolvedRelation, UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression,
NameParameterizedQuery, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command,
CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect,
ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union,
UnresolvedWith, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command,
CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect,
ReturnAnswer, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.transactions.TransactionUtils
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog,
SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin}
+import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext,
InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin,
DisableUnnecessaryBucketedScan}
-import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils,
TransactionalExec, V2TableRefreshUtil}
+import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec,
V2TableRefreshUtil}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
@@ -56,7 +54,6 @@ import
org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.scripting.SqlScriptingExecution
import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator}
import org.apache.spark.util.ArrayImplicits._
@@ -119,16 +116,9 @@ class QueryExecution(
analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
// Only begin a new transaction for outer QEs that lead to execution.
if (mode != CommandExecutionMode.SKIP) {
- def resolve(w: TransactionalWritePlan):
Option[TransactionalCatalogPlugin] =
- pathBased(w) match {
- case Some(c: TransactionalCatalogPlugin) => Some(c)
- case Some(_) => None
- // If the path is not data source based, fallback to catalog based
resolution.
- case None => TransactionalWrite.unapply(w)
- }
val catalog = logical match {
- case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w)
- case w: TransactionalWritePlan => resolve(w)
+ case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
+ case TransactionalWrite(c) => Some(c)
case _ => None
}
catalog.map(TransactionUtils.beginTransaction)
@@ -139,34 +129,6 @@ class QueryExecution(
}
private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get
- // For path-based tables (e.g. `format.`/path/to/table``) the first
identifier part is a
- // connector name. SupportsCatalogOptions on the connector tells us which
catalog actually
- // owns the table. Returns Some(catalog) if parts.head is a recognized
SupportsCatalogOptions
- // data source (caller decides whether the catalog is transactional), or
None to fall through
- // to the catalog-based extractor.
- private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] =
- EliminateSubqueryAliases(write.table) match {
- case UnresolvedRelation(parts, _, _) if parts.length > 1 =>
- try {
- DataSource.lookupDataSourceV2(parts.head,
sparkSession.sessionState.conf)
- .collect { case sco: SupportsCatalogOptions => sco }
- .map { sco =>
- val sessionConfigs = DataSourceV2Utils.extractSessionConfigs(
- sco, sparkSession.sessionState.conf)
- // Pass the entire identifier as option. The connector can
decide how to parse it
- // if needed.
- val options = sessionConfigs + ("identifier" ->
parts.mkString("."))
- CatalogV2Util.getTableProviderCatalog(
- sco, catalogManager, new
CaseInsensitiveStringMap(options.asJava))
- }
- } catch {
- // The head of the multipart identifier is not a registered data
source.
- // Fallback to catalog-based detection.
- case _: ClassNotFoundException => None
- }
- case _ => None
- }
-
// Per-query analyzer: uses a transaction-aware CatalogManager when a
transaction is active,
// so that all catalog lookups and rule applications during analysis see the
correct state
// without relying on thread-local context. Any nested QueryExecution that
is created during
diff --git
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 0354e545aa90..c1fc7234d7c1 100644
---
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -18,8 +18,6 @@
org.apache.spark.sql.sources.FakeSourceOne
org.apache.spark.sql.sources.FakeSourceTwo
org.apache.spark.sql.sources.FakeSourceThree
-org.apache.spark.sql.connector.FakePathBasedSource
-org.apache.spark.sql.connector.FakePathBasedSourceWithSessionConfig
org.apache.spark.sql.sources.FakeSourceFour
org.apache.fakesource.FakeExternalSourceOne
org.apache.fakesource.FakeExternalSourceTwo
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
deleted file mode 100644
index c81f53673af3..000000000000
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier,
InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog,
SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog,
SupportsCatalogOptions}
-import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream,
StreamingQueryWrapper}
-import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.streaming.StreamingQuery
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-/**
- * Tests for transactional writes to path-based tables, where the table is
identified by a
- * bare path with no catalog prefix (e.g. `/path/to/t`), or a
connector-prefixed path
- * (e.g. `pathformat.`/path/to/t``). The transactional catalog is registered
as the session
- * catalog (`spark_catalog`).
- */
-class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase {
-
- import testImplicits._
-
- private val tablePath = "`/path/to/t`"
- private val tablePathWithFormat = "pathformat.`/path/to/t`"
-
- override def beforeEach(): Unit = {
- super.beforeEach()
- spark.conf.set(
- V2_SESSION_CATALOG_IMPLEMENTATION.key,
- classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName)
- }
-
- override def afterEach(): Unit = {
- SharedTablesInMemoryRowLevelOperationTableCatalog.reset()
- spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
- super.afterEach()
- }
-
- override protected def catalog: InMemoryRowLevelOperationTableCatalog = {
- spark.sessionState.catalogManager.v2SessionCatalog
- .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
- }
-
- private def streamSessionCatalog(query: StreamingQuery):
InMemoryRowLevelOperationTableCatalog = {
- val session =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sparkSessionForStream
- session.sessionState.catalogManager.v2SessionCatalog
- .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
- }
-
- private def createPathTable(name: String): Unit = {
- sql(s"CREATE TABLE $name (id INT, data STRING)")
- }
-
- test("SQL insert into bare path-based table participates in transaction") {
- createPathTable(tablePath)
- val (txn, _) = executeTransaction {
- sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
- }
- assert(txn.currentState === Committed)
- assert(txn.isClosed)
- checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
- }
-
- test("SQL insert with connector-prefixed path participates in transaction") {
- createPathTable(tablePathWithFormat)
- val (txn, _) = executeTransaction {
- sql(s"INSERT INTO $tablePathWithFormat VALUES (1, 'a'), (2, 'b')")
- }
- assert(txn.currentState === Committed)
- assert(txn.isClosed)
- checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b")
:: Nil)
- }
-
- test("SQL insert with CTE into connector-prefixed path participates in
transaction") {
- createPathTable(tablePathWithFormat)
- val (txn, _) = executeTransaction {
- sql(s"""
- |WITH cte AS (SELECT 1 AS id, 'a' AS data)
- |INSERT INTO $tablePathWithFormat SELECT * FROM cte
- |""".stripMargin)
- }
- assert(txn.currentState === Committed)
- assert(txn.isClosed)
- checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil)
- }
-
- test("session-config catalog controls which catalog is enrolled in
transaction") {
- withSQLConf(
- "spark.sql.catalog.txncat" ->
classOf[InMemoryRowLevelOperationTableCatalog].getName,
- "spark.sql.catalog.nontxncat" ->
classOf[InMemoryTableCatalog].getName) {
- val txnCat = spark.sessionState.catalogManager.catalog("txncat")
- .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
-
- // Non-transactional catalog configured.
- withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") {
- createPathTable("pathformat2.`/path/to/t1`")
- sql("INSERT INTO pathformat2.`/path/to/t1` VALUES (1, 'a')")
- // The transaction was not routed to any of the transactional catalogs.
- assert(catalog.lastTransaction == null)
- assert(txnCat.lastTransaction == null)
- }
-
- // Transactional catalog configured: pathBased resolves txncat as a
- // TransactionalCatalogPlugin and opens the transaction there instead.
- withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") {
- createPathTable("pathformat2.`/path/to/t2`")
- sql("INSERT INTO pathformat2.`/path/to/t2` VALUES (1, 'a')")
- assert(txnCat.lastTransaction.currentState === Committed)
- assert(txnCat.lastTransaction.isClosed)
- }
- }
- }
-
- test("streaming write to path-based table participates in transaction") {
- sql(s"CREATE TABLE $tablePathWithFormat (value INT)")
-
- withTempDir { checkpointDir =>
- val inputData = MemoryStream[Int]
- val query = inputData.toDF()
- .writeStream
- .option("checkpointLocation", checkpointDir.getAbsolutePath)
- .toTable(tablePathWithFormat)
-
- inputData.addData(1, 2, 3)
- query.processAllAvailable()
- query.stop()
-
- val streamCat = streamSessionCatalog(query)
- val txn = streamCat.lastTransaction
- assert(txn != null, "expected a transaction to have been committed")
- assert(txn.currentState === Committed)
- assert(txn.isClosed)
- // Streaming must not add transactions to the main session catalog.
- assert(catalog.observedTransactions.isEmpty)
- checkAnswer(spark.table(tablePathWithFormat), Row(1) :: Row(2) :: Row(3)
:: Nil)
- }
- }
-
- test("streaming self-join on path-based table is tracked as a scan event") {
- sql(s"CREATE TABLE $tablePathWithFormat (value INT)")
- sql(s"INSERT INTO $tablePathWithFormat VALUES (1), (2), (3)")
-
- withTempDir { checkpointDir =>
- val inputData = MemoryStream[Int]
- val staticData = spark.read.table(tablePathWithFormat)
-
- val query = inputData.toDF()
- .join(staticData, "value")
- .writeStream
- .option("checkpointLocation", checkpointDir.getAbsolutePath)
- .toTable(tablePathWithFormat)
-
- inputData.addData(1, 2, 3)
- query.processAllAvailable()
- query.stop()
-
- val streamCat = streamSessionCatalog(query)
- val txn = streamCat.lastTransaction
- assert(txn != null, "expected a transaction to have been committed")
- assert(txn.currentState === Committed)
- assert(txn.isClosed)
- // The path-based table is both the write target and a batch source in
the same transaction.
- assert(txn.catalog.txnTables.size === 1)
- val txnTable = txn.catalog.txnTables.values.head
- assert(txnTable.scanEvents.size === 1)
- // Streaming must not add transactions to the main session catalog
beyond the pre-existing
- // INSERT transaction.
- assert(catalog.observedTransactions.size === 1)
- }
- }
-
- test("SQL insert with unregistered format produces analysis error and aborts
transaction") {
- createPathTable(tablePathWithFormat)
- // "Unregistered" is not a known catalog and not registered data source.
- // So Spark falls back to treating it as a namespace in spark_catalog. The
table
- // does not exist, causing an AnalysisException. The transaction is
started (because
- // spark_catalog IS a TransactionalCatalogPlugin) and then aborted on
failure.
- checkError(
- exception = intercept[AnalysisException] {
- sql("INSERT INTO unregistered.`/path/to/t` VALUES (1, 'a'), (2, 'b')")
- },
- condition = "TABLE_OR_VIEW_NOT_FOUND",
- parameters = Map("relationName" -> "`unregistered`.`/path/to/t`"),
- context = ExpectedContext(
- fragment = "unregistered.`/path/to/t`",
- start = -1,
- stop = -1))
- val txn = catalog.lastTransaction
- assert(txn.currentState === Aborted)
- assert(txn.isClosed)
- }
-}
-
-/**
- * Simulates a path-based connector (e.g. Delta) that implements
[[SupportsCatalogOptions]]
- * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session
catalog. Returning
- * null from [[extractCatalog]] signals that the session catalog
(`spark_catalog`) owns the
- * table, matching Delta's behavior where DeltaCatalog is registered as
spark_catalog.
- */
-class FakePathBasedSource
- extends FakeV2ProviderWithCustomSchema
- with SupportsCatalogOptions
- with DataSourceRegister {
-
- override def shortName(): String = "pathformat"
-
- // Use the session catalog.
- override def extractCatalog(options: CaseInsensitiveStringMap): String = null
-
- // Not used in the transactional path.
- override def extractIdentifier(options: CaseInsensitiveStringMap):
Identifier = null
-}
-
-/**
- * Like [[FakePathBasedSource]] but resolves the owning catalog from the
session config
- * `spark.datasource.pathformat2.catalog` instead of always returning null.
This simulates
- * a connector that lets users configure the target catalog.
- */
-class FakePathBasedSourceWithSessionConfig
- extends FakeV2ProviderWithCustomSchema
- with SupportsCatalogOptions
- with SessionConfigSupport
- with DataSourceRegister {
-
- override def shortName(): String = "pathformat2"
-
- override def keyPrefix: String = "pathformat2"
-
- override def extractCatalog(options: CaseInsensitiveStringMap): String =
options.get("catalog")
-
- // Not used in the transactional path.
- override def extractIdentifier(options: CaseInsensitiveStringMap):
Identifier = null
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]