This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 42db15229144 [SPARK-56695][SQL][DML] Remove Path Based Table support 
in SQL
42db15229144 is described below

commit 42db152291444138d720525931ee82013314a81f
Author: Andreas Chatzistergiou <[email protected]>
AuthorDate: Thu Jun 4 11:03:28 2026 -0700

    [SPARK-56695][SQL][DML] Remove Path Based Table support in SQL
    
    ### What changes were proposed in this pull request?
    
    Current implementation for path based tables in SQL is partial. We are 
removing it for Spark 4.2.
    
    ### Why are the changes needed?
    
    The previous [attempt](https://github.com/apache/spark/pull/56039) to add 
path-based table support was halted due to concerns. For Spark 4.2 we are 
letting the connector handle it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Path based support in SQL was not yet released.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Opus 4.7.
    
    Closes #56316 from andreaschat-db/dsv2TransactionRemoveSQLPathBasedSupport.
    
    Authored-by: Andreas Chatzistergiou <[email protected]>
    Signed-off-by: Anton Okolnychyi <[email protected]>
---
 .../spark/sql/execution/QueryExecution.scala       |  50 +----
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 -
 .../connector/PathBasedTableTransactionSuite.scala | 249 ---------------------
 3 files changed, 6 insertions(+), 295 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 65bc57de907b..3ca3032be485 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -22,7 +22,6 @@ import java.util.UUID
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import javax.annotation.concurrent.GuardedBy
 
-import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
@@ -32,22 +31,21 @@ import 
org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
 import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, 
UnresolvedRelation, UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, 
NameParameterizedQuery, UnsupportedOperationChecker}
 import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, 
CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, 
ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, 
UnresolvedWith, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, 
CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, 
ReturnAnswer, Union, UnresolvedWith, WithCTE}
 import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
 import org.apache.spark.sql.catalyst.transactions.TransactionUtils
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, 
SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin}
+import org.apache.spark.sql.connector.catalog.LookupCatalog
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY
 import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, 
InsertAdaptiveSparkPlan}
 import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, 
DisableUnnecessaryBucketedScan}
-import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, 
TransactionalExec, V2TableRefreshUtil}
+import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, 
V2TableRefreshUtil}
 import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
 import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
@@ -56,7 +54,6 @@ import 
org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.scripting.SqlScriptingExecution
 import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator}
 import org.apache.spark.util.ArrayImplicits._
 
@@ -119,16 +116,9 @@ class QueryExecution(
     analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
       // Only begin a new transaction for outer QEs that lead to execution.
       if (mode != CommandExecutionMode.SKIP) {
-        def resolve(w: TransactionalWritePlan): 
Option[TransactionalCatalogPlugin] =
-          pathBased(w) match {
-            case Some(c: TransactionalCatalogPlugin) => Some(c)
-            case Some(_) => None
-            // If the path is not data source based, fallback to catalog based 
resolution.
-            case None => TransactionalWrite.unapply(w)
-          }
         val catalog = logical match {
-          case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w)
-          case w: TransactionalWritePlan => resolve(w)
+          case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
+          case TransactionalWrite(c) => Some(c)
           case _ => None
         }
         catalog.map(TransactionUtils.beginTransaction)
@@ -139,34 +129,6 @@ class QueryExecution(
   }
   private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get
 
-  // For path-based tables (e.g. `format.`/path/to/table``) the first 
identifier part is a
-  // connector name. SupportsCatalogOptions on the connector tells us which 
catalog actually
-  // owns the table. Returns Some(catalog) if parts.head is a recognized 
SupportsCatalogOptions
-  // data source (caller decides whether the catalog is transactional), or 
None to fall through
-  // to the catalog-based extractor.
-  private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] =
-    EliminateSubqueryAliases(write.table) match {
-      case UnresolvedRelation(parts, _, _) if parts.length > 1 =>
-        try {
-          DataSource.lookupDataSourceV2(parts.head, 
sparkSession.sessionState.conf)
-            .collect { case sco: SupportsCatalogOptions => sco }
-            .map { sco =>
-              val sessionConfigs = DataSourceV2Utils.extractSessionConfigs(
-                sco, sparkSession.sessionState.conf)
-              // Pass the entire identifier as option. The connector can 
decide how to parse it
-              // if needed.
-              val options = sessionConfigs + ("identifier" -> 
parts.mkString("."))
-              CatalogV2Util.getTableProviderCatalog(
-                sco, catalogManager, new 
CaseInsensitiveStringMap(options.asJava))
-            }
-        } catch {
-          // The head of the multipart identifier is not a registered data 
source.
-          // Fallback to catalog-based detection.
-          case _: ClassNotFoundException => None
-        }
-      case _ => None
-    }
-
   // Per-query analyzer: uses a transaction-aware CatalogManager when a 
transaction is active,
   // so that all catalog lookups and rule applications during analysis see the 
correct state
   // without relying on thread-local context. Any nested QueryExecution that 
is created during
diff --git 
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 0354e545aa90..c1fc7234d7c1 100644
--- 
a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -18,8 +18,6 @@
 org.apache.spark.sql.sources.FakeSourceOne
 org.apache.spark.sql.sources.FakeSourceTwo
 org.apache.spark.sql.sources.FakeSourceThree
-org.apache.spark.sql.connector.FakePathBasedSource
-org.apache.spark.sql.connector.FakePathBasedSourceWithSessionConfig
 org.apache.spark.sql.sources.FakeSourceFour
 org.apache.fakesource.FakeExternalSourceOne
 org.apache.fakesource.FakeExternalSourceTwo
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
deleted file mode 100644
index c81f53673af3..000000000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier, 
InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, 
SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog, 
SupportsCatalogOptions}
-import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, 
StreamingQueryWrapper}
-import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.streaming.StreamingQuery
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-/**
- * Tests for transactional writes to path-based tables, where the table is 
identified by a
- * bare path with no catalog prefix (e.g. `/path/to/t`), or a 
connector-prefixed path
- * (e.g. `pathformat.`/path/to/t``). The transactional catalog is registered 
as the session
- * catalog (`spark_catalog`).
- */
-class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase {
-
-  import testImplicits._
-
-  private val tablePath = "`/path/to/t`"
-  private val tablePathWithFormat = "pathformat.`/path/to/t`"
-
-  override def beforeEach(): Unit = {
-    super.beforeEach()
-    spark.conf.set(
-      V2_SESSION_CATALOG_IMPLEMENTATION.key,
-      classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName)
-  }
-
-  override def afterEach(): Unit = {
-    SharedTablesInMemoryRowLevelOperationTableCatalog.reset()
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
-    super.afterEach()
-  }
-
-  override protected def catalog: InMemoryRowLevelOperationTableCatalog = {
-    spark.sessionState.catalogManager.v2SessionCatalog
-      .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
-  }
-
-  private def streamSessionCatalog(query: StreamingQuery): 
InMemoryRowLevelOperationTableCatalog = {
-    val session = 
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sparkSessionForStream
-    session.sessionState.catalogManager.v2SessionCatalog
-      .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
-  }
-
-  private def createPathTable(name: String): Unit = {
-    sql(s"CREATE TABLE $name (id INT, data STRING)")
-  }
-
-  test("SQL insert into bare path-based table participates in transaction") {
-    createPathTable(tablePath)
-    val (txn, _) = executeTransaction {
-      sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')")
-    }
-    assert(txn.currentState === Committed)
-    assert(txn.isClosed)
-    checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil)
-  }
-
-  test("SQL insert with connector-prefixed path participates in transaction") {
-    createPathTable(tablePathWithFormat)
-    val (txn, _) = executeTransaction {
-      sql(s"INSERT INTO $tablePathWithFormat VALUES (1, 'a'), (2, 'b')")
-    }
-    assert(txn.currentState === Committed)
-    assert(txn.isClosed)
-    checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b") 
:: Nil)
-  }
-
-  test("SQL insert with CTE into connector-prefixed path participates in 
transaction") {
-    createPathTable(tablePathWithFormat)
-    val (txn, _) = executeTransaction {
-      sql(s"""
-        |WITH cte AS (SELECT 1 AS id, 'a' AS data)
-        |INSERT INTO $tablePathWithFormat SELECT * FROM cte
-        |""".stripMargin)
-    }
-    assert(txn.currentState === Committed)
-    assert(txn.isClosed)
-    checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil)
-  }
-
-  test("session-config catalog controls which catalog is enrolled in 
transaction") {
-    withSQLConf(
-        "spark.sql.catalog.txncat" -> 
classOf[InMemoryRowLevelOperationTableCatalog].getName,
-        "spark.sql.catalog.nontxncat" -> 
classOf[InMemoryTableCatalog].getName) {
-      val txnCat = spark.sessionState.catalogManager.catalog("txncat")
-        .asInstanceOf[InMemoryRowLevelOperationTableCatalog]
-
-      // Non-transactional catalog configured.
-      withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") {
-        createPathTable("pathformat2.`/path/to/t1`")
-        sql("INSERT INTO pathformat2.`/path/to/t1` VALUES (1, 'a')")
-        // The transaction was not routed to any of the transactional catalogs.
-        assert(catalog.lastTransaction == null)
-        assert(txnCat.lastTransaction == null)
-      }
-
-      // Transactional catalog configured: pathBased resolves txncat as a
-      // TransactionalCatalogPlugin and opens the transaction there instead.
-      withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") {
-        createPathTable("pathformat2.`/path/to/t2`")
-        sql("INSERT INTO pathformat2.`/path/to/t2` VALUES (1, 'a')")
-        assert(txnCat.lastTransaction.currentState === Committed)
-        assert(txnCat.lastTransaction.isClosed)
-      }
-    }
-  }
-
-  test("streaming write to path-based table participates in transaction") {
-    sql(s"CREATE TABLE $tablePathWithFormat (value INT)")
-
-    withTempDir { checkpointDir =>
-      val inputData = MemoryStream[Int]
-      val query = inputData.toDF()
-        .writeStream
-        .option("checkpointLocation", checkpointDir.getAbsolutePath)
-        .toTable(tablePathWithFormat)
-
-      inputData.addData(1, 2, 3)
-      query.processAllAvailable()
-      query.stop()
-
-      val streamCat = streamSessionCatalog(query)
-      val txn = streamCat.lastTransaction
-      assert(txn != null, "expected a transaction to have been committed")
-      assert(txn.currentState === Committed)
-      assert(txn.isClosed)
-      // Streaming must not add transactions to the main session catalog.
-      assert(catalog.observedTransactions.isEmpty)
-      checkAnswer(spark.table(tablePathWithFormat), Row(1) :: Row(2) :: Row(3) 
:: Nil)
-    }
-  }
-
-  test("streaming self-join on path-based table is tracked as a scan event") {
-    sql(s"CREATE TABLE $tablePathWithFormat (value INT)")
-    sql(s"INSERT INTO $tablePathWithFormat VALUES (1), (2), (3)")
-
-    withTempDir { checkpointDir =>
-      val inputData = MemoryStream[Int]
-      val staticData = spark.read.table(tablePathWithFormat)
-
-      val query = inputData.toDF()
-        .join(staticData, "value")
-        .writeStream
-        .option("checkpointLocation", checkpointDir.getAbsolutePath)
-        .toTable(tablePathWithFormat)
-
-      inputData.addData(1, 2, 3)
-      query.processAllAvailable()
-      query.stop()
-
-      val streamCat = streamSessionCatalog(query)
-      val txn = streamCat.lastTransaction
-      assert(txn != null, "expected a transaction to have been committed")
-      assert(txn.currentState === Committed)
-      assert(txn.isClosed)
-      // The path-based table is both the write target and a batch source in 
the same transaction.
-      assert(txn.catalog.txnTables.size === 1)
-      val txnTable = txn.catalog.txnTables.values.head
-      assert(txnTable.scanEvents.size === 1)
-      // Streaming must not add transactions to the main session catalog 
beyond the pre-existing
-      // INSERT transaction.
-      assert(catalog.observedTransactions.size === 1)
-    }
-  }
-
-  test("SQL insert with unregistered format produces analysis error and aborts 
transaction") {
-    createPathTable(tablePathWithFormat)
-    // "Unregistered" is not a known catalog and not registered data source.
-    // So Spark falls back to treating it as a namespace in spark_catalog. The 
table
-    // does not exist, causing an AnalysisException. The transaction is 
started (because
-    // spark_catalog IS a TransactionalCatalogPlugin) and then aborted on 
failure.
-    checkError(
-      exception = intercept[AnalysisException] {
-        sql("INSERT INTO unregistered.`/path/to/t` VALUES (1, 'a'), (2, 'b')")
-      },
-      condition = "TABLE_OR_VIEW_NOT_FOUND",
-      parameters = Map("relationName" -> "`unregistered`.`/path/to/t`"),
-      context = ExpectedContext(
-        fragment = "unregistered.`/path/to/t`",
-        start = -1,
-        stop = -1))
-    val txn = catalog.lastTransaction
-    assert(txn.currentState === Aborted)
-    assert(txn.isClosed)
-  }
-}
-
-/**
- * Simulates a path-based connector (e.g. Delta) that implements 
[[SupportsCatalogOptions]]
- * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session 
catalog. Returning
- * null from [[extractCatalog]] signals that the session catalog 
(`spark_catalog`) owns the
- * table, matching Delta's behavior where DeltaCatalog is registered as 
spark_catalog.
- */
-class FakePathBasedSource
-    extends FakeV2ProviderWithCustomSchema
-    with SupportsCatalogOptions
-    with DataSourceRegister {
-
-  override def shortName(): String = "pathformat"
-
-  // Use the session catalog.
-  override def extractCatalog(options: CaseInsensitiveStringMap): String = null
-
-  // Not used in the transactional path.
-  override def extractIdentifier(options: CaseInsensitiveStringMap): 
Identifier = null
-}
-
-/**
- * Like [[FakePathBasedSource]] but resolves the owning catalog from the 
session config
- * `spark.datasource.pathformat2.catalog` instead of always returning null. 
This simulates
- * a connector that lets users configure the target catalog.
- */
-class FakePathBasedSourceWithSessionConfig
-    extends FakeV2ProviderWithCustomSchema
-    with SupportsCatalogOptions
-    with SessionConfigSupport
-    with DataSourceRegister {
-
-  override def shortName(): String = "pathformat2"
-
-  override def keyPrefix: String = "pathformat2"
-
-  override def extractCatalog(options: CaseInsensitiveStringMap): String = 
options.get("catalog")
-
-  // Not used in the transactional path.
-  override def extractIdentifier(options: CaseInsensitiveStringMap): 
Identifier = null
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to