This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git

commit f975bac03a7431c813283f9816aa9021d5580dfd
Author: Serge Rielau <[email protected]>
AuthorDate: Wed May 20 20:48:55 2026 +0800

    [SPARK-56939][SQL] Resolve deadlock between USE and function lookup
    
    ### What changes were proposed in this pull request?
    
    Break the `SessionCatalog`/`CatalogManager` lock-order inversion that can 
deadlock concurrent `USE SCHEMA` / `USE CATALOG` and unqualified function 
resolution on the same `SparkSession`.
    
    - `CatalogManager.setCurrentNamespace` / `setCurrentCatalog`: snapshot the 
dispatch decision under the manager's intrinsic lock, run the 
`v1SessionCatalog` callbacks **outside** the lock, then publish the new state 
under the lock again. This stops the "manager lock then catalog lock" arm of 
the cycle.
    - Add `CatalogManager.sessionFunctionKindsForUnqualifiedResolution()` that 
snapshots `(currentCatalog, currentNamespace, sessionPath)` in a single 
critical section. The `v1SessionCatalog.getCurrentDatabase` read needed for the 
default-namespace fallback is taken **before** the manager lock, so the helper 
never re-introduces the deadlock cycle while still avoiding torn-state 
observations under racing path updates.
    - Route `SessionCatalog.sessionFunctionKindsInResolutionOrder` and 
`FunctionResolution.isSessionBeforeBuiltinInPath` through that single helper, 
so the lookup loop and the `session-before-builtin` predicate share one 
consistent snapshot.
    - Tighten the doc comments on the affected methods to document the locking 
contract and prevent future regressions.
    
    ### Why are the changes needed?
    
    After SPARK-56750 wired `CatalogManager` into `SessionCatalog` as the live 
source for path-driven session function kinds, two paths form a lock-order 
inversion:
    
    - **Arm 1** (`SessionCatalog.synchronized` -> 
`CatalogManager.synchronized`): unqualified function resolution evaluating the 
live PATH reaches into `CatalogManager.sqlResolutionPathEntries` (which 
synchronizes on the manager) while holding the catalog's intrinsic lock at peer 
call sites.
    - **Arm 2** (`CatalogManager.synchronized` -> 
`SessionCatalog.synchronized`): `setCurrentNamespace` / `setCurrentCatalog` 
hold the manager's lock and then call back into 
`v1SessionCatalog.setCurrentDatabase*`, which synchronizes on `SessionCatalog`.
    
    Two threads sharing a `SparkSession` -- one running any SQL with an 
unqualified function reference, the other running `USE SCHEMA` / `USE CATALOG` 
-- can wedge on each other's intrinsic locks. The hazard is independent of 
`spark.sql.functionResolution.sessionOrder`: Arm 1 still acquires the manager 
lock just to read what the order is, and Arm 2 has nothing to do with order at 
all. See [SPARK-56939](https://issues.apache.org/jira/browse/SPARK-56939) for a 
standalone repro.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a concurrency fix; serial behavior is unchanged. The only 
observable difference under contention is that the session no longer deadlocks.
    
    ### How was this patch tested?
    
    - New regression test in `SetPathSuite`, `SPARK-56939: concurrent USE 
SCHEMA and unqualified function lookups do not deadlock`, that follows the 
JIRA's exact repro: one thread alternates `USE SCHEMA s1` / `USE SCHEMA s2`, 
another runs unqualified `count(*)` queries. Without the fix the threads hang 
on each other's intrinsic locks; with the fix the test completes within the 30s 
budget.
    - ``build/sbt 'sql/testOnly *SetPathSuite'`` -- 60/60 pass.
    - ``build/sbt 'catalyst/testOnly *SessionCatalogSuite *CatalogManagerSuite 
*FunctionResolution*'`` -- 119/119 pass.
    - ``build/sbt 'sql/testOnly *SQLFunctionSuite *SQLViewSuite'`` -- 70/70 
pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor / Claude Opus 4.7
    
    Closes #55977 from srielau/SPARK-56939-use-func-deadlock.
    
    Authored-by: Serge Rielau <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/FunctionResolution.scala |   8 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  27 ++--
 .../sql/connector/catalog/CatalogManager.scala     | 147 ++++++++++++++++++---
 .../scala/org/apache/spark/sql/SetPathSuite.scala  | 119 +++++++++++++++++
 4 files changed, 273 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index e3dbcc4b6ef7..4f6aee03967c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -83,12 +83,12 @@ class FunctionResolution(
    * `count` shadows the builtin) and the `SessionCatalog` security check that 
blocks creating
    * a temp function with a builtin's name. Reads the live PATH via 
`CatalogManager` and
    * applies the same kinds extraction that drives `SessionCatalog`'s 
fast-path provider, so
-   * the predicate stays in sync with the lookup loop's actual order.
+   * the predicate stays in sync with the lookup loop's actual order. Uses the 
consolidated
+   * snapshot helper (SPARK-56939) so the (catalog, namespace, path) triple is 
observed
+   * atomically.
    */
   def isSessionBeforeBuiltinInPath: Boolean = {
-    val path = catalogManager.sqlResolutionPathEntries(
-      catalogManager.currentCatalog.name(), 
catalogManager.currentNamespace.toSeq)
-    CatalogManager.systemFunctionKindsFromPath(path).headOption
+    catalogManager.sessionFunctionKindsForUnqualifiedResolution().headOption
       .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 71653eec139b..9e5a2176612c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -154,13 +154,19 @@ class SessionCatalog(
    * Session function kinds in resolution order for unqualified lookups: test 
override if set,
    * else live PATH from [[catalogManagerForSessionFunctionKinds]], else
    * [[SQLConf.systemPathOrder]].
+   *
+   * MUST NOT be called while holding [[SessionCatalog]]'s intrinsic lock (see 
SPARK-56939):
+   * the path-driven branch delegates to [[CatalogManager]], which has its own 
intrinsic lock
+   * and re-enters this catalog through `USE` paths, so nesting the two locks 
here would
+   * deadlock.
    */
   private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
     sessionFunctionKindsTestOverride.getOrElse {
       catalogManagerForSessionFunctionKinds match {
         case Some(cm) =>
-          CatalogManager.systemFunctionKindsFromPath(
-            cm.sqlResolutionPathEntries(cm.currentCatalog.name(), 
cm.currentNamespace.toSeq))
+          // Use the consolidated helper so unqualified resolution observes a 
consistent
+          // (currentCatalog, currentNamespace, path) triple in a single 
critical section.
+          cm.sessionFunctionKindsForUnqualifiedResolution()
         case None =>
           CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
       }
@@ -2565,11 +2571,13 @@ class SessionCatalog(
    * Resolution order follows the configured path (e.g. builtin then session).
    */
   def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = 
{
-    // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution 
order may call
-    // into [[CatalogManager]] (e.g. 
[[CatalogManager.sqlResolutionPathEntries]]), which can
-    // synchronize on the manager; another
-    // thread can hold that lock and call into this catalog (e.g. via 
`setCurrentNamespace`),
-    // which would deadlock if this method also synchronized on `this`.
+    // Intentionally not `synchronized` on this [[SessionCatalog]]: resolution 
order may call
+    // into [[CatalogManager]] (e.g. 
[[CatalogManager.sqlResolutionPathEntries]] via
+    // [[sessionFunctionKindsInResolutionOrder]]), which synchronizes on the 
manager. The
+    // SPARK-56939 fix removed the reverse `CatalogManager -> SessionCatalog` 
nest from the
+    // `USE`-style mutators that previously closed the deadlock cycle; keeping 
this method
+    // un-synchronized preserves the `SessionCatalog -> CatalogManager` 
direction as the
+    // single allowed ordering, so the invariant survives future regressions.
     lookupFunctionWithShadowing(name, tableFunctionRegistry, 
checkBuiltinOperators = false)
   }
 
@@ -2724,8 +2732,9 @@ class SessionCatalog(
   def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
     // Intentionally not `synchronized` on this [[SessionCatalog]] (see
     // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp 
resolution uses
-    // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must 
not run under
-    // this catalog's intrinsic lock.
+    // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]], and 
SPARK-56939
+    // requires this catalog's intrinsic lock to NEVER be held when reaching 
into
+    // [[CatalogManager]] from a function-resolution path.
     if (name.database.isEmpty) {
       lookupBuiltinOrTempFunction(name.funcName)
         .orElse(lookupBuiltinOrTempTableFunction(name.funcName))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 9a39e4ebdd27..3aad52dbd1d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -132,15 +132,47 @@ class CatalogManager(
     }
   }
 
-  def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
-    if (isSessionCatalog(currentCatalog) && namespace.length == 1) {
+  def setCurrentNamespace(namespace: Array[String]): Unit = {
+    // SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the 
callbacks below.
+    // [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly 
synchronizes on
+    // [[SessionCatalog]], and concurrent unqualified function resolution 
acquires the
+    // [[SessionCatalog]] lock and then reaches into [[CatalogManager]] via
+    // [[sqlResolutionPathEntries]]; nesting the manager lock outside the 
catalog lock here
+    // would invert that order and deadlock. Snapshot the dispatch decision 
under the lock,
+    // run callbacks outside it, then publish the new namespace under the lock 
again.
+    //
+    // Concurrency trade-offs versus the pre-SPARK-56939 atomic version 
(v1-side and
+    // CM-side drift modes):
+    //
+    // (a) v1-side drift. The `isSession` snapshot can drift if a concurrent
+    //     [[setCurrentCatalog]] switches to a v2 catalog between this read 
and the v1
+    //     callback below -- the callback would still touch `v1.currentDb` 
even though
+    //     the active catalog is no longer the session catalog. A later switch 
back to
+    //     the session catalog resets `v1.currentDb` to `default` (see
+    //     [[setCurrentCatalog]]), so long-term state remains consistent; only 
the
+    //     intermediate observation is novel.
+    //
+    // (b) CM-side publish-overwrite drift (sticky). Between the v1 callback 
returning
+    //     and the publish below, a concurrent [[setCurrentCatalog]] can 
complete fully
+    //     -- switching `_currentCatalogName` to (say) a v2 catalog and 
clearing
+    //     `_currentNamespace = None` -- before this method's publish 
overwrites that
+    //     with `Some(namespace)`. End state: `_currentNamespace = 
Some(namespace)` is
+    //     published under a different `_currentCatalogName` than the one 
observed when
+    //     [[isSession]] was snapshotted at the top. Unlike (a) there is no 
analogous
+    //     auto-recovery; the mismatch sticks until the next `USE`. This is 
still
+    //     last-writer-wins for two racing `USE` commands, which is the 
conventional
+    //     expectation, so it is accepted as a trade-off against the deadlock 
alternative.
+    val isSession = synchronized(isSessionCatalog(currentCatalog))
+    if (isSession && namespace.length == 1) {
       v1SessionCatalog.setCurrentDatabaseWithNameCheck(
         namespace.head,
         _ => assertNamespaceExist(namespace))
     } else {
       assertNamespaceExist(namespace)
     }
-    _currentNamespace = Some(namespace)
+    synchronized {
+      _currentNamespace = Some(namespace)
+    }
   }
 
   import CatalogManager.SessionPathEntry
@@ -221,6 +253,15 @@ class CatalogManager(
    * When PATH is enabled and a session path is in effect (stored or via
    * [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls 
back to the legacy
    * resolutionSearchPath.
+   *
+   * SPARK-56939 note: this is currently the only intentional 
`CatalogManager.synchronized ->
+   * SessionCatalog.synchronized` nest left in this class. The transitive call 
into
+   * [[v1SessionCatalog.getCurrentDatabase]] happens via [[currentNamespace]], 
which fetches
+   * the v1 current database under the CM lock. It is safe today because no 
code path holds
+   * [[SessionCatalog]]'s intrinsic lock while waiting on [[CatalogManager]]'s 
-- the
+   * SPARK-56939 fix removed every such SC->CM ordering. Any future change 
that introduces a
+   * new SC->CM ordering must take `currentPathString` (or any other CM->SC 
nest) into
+   * account to avoid resurrecting the deadlock.
    */
   def currentPathString: String = synchronized {
     import CatalogV2Implicits._
@@ -265,6 +306,48 @@ class CatalogManager(
       currentCatalog, currentNamespace,
       currentCatalog, currentNamespace)
 
+  /**
+   * Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]] 
order used by
+   * unqualified function/table-function resolution.
+   *
+   * The `(currentCatalog, _currentNamespace, sessionPath)` triple is read 
together inside a
+   * single CM critical section so a concurrent `USE` / `SET PATH` cannot 
return a torn
+   * snapshot for those three fields (e.g. catalog from one observation, 
explicit namespace
+   * from another).
+   *
+   * The `v1SessionCatalog.getCurrentDatabase` read needed for the 
default-namespace fallback
+   * is taken OUTSIDE the CM lock and is therefore intentionally racy w.r.t. a 
concurrent
+   * `USE SCHEMA`. That staleness is harmless for this helper's output: this 
method consumes
+   * `effectiveNs` only to expand `CURRENT_SCHEMA` markers in the SQL path, and
+   * [[CatalogManager.systemFunctionKindsFromPath]] only retains literal 
`system.builtin` /
+   * `system.session` entries from the resolved path -- it never inspects any
+   * `(catalog, namespace)` derived from `v1`. So if `v1CurrentDb` lags by one 
`USE SCHEMA`,
+   * a `CURRENT_SCHEMA` entry might briefly resolve to the previous database, 
but the kinds
+   * list (the only thing returned here) is unaffected. Moving the read inside 
the CM lock
+   * would re-introduce the SPARK-56939 lock-order inversion this helper 
exists to avoid.
+   *
+   * Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]],
+   * 
[[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
+   * MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this 
method.
+   */
+  def sessionFunctionKindsForUnqualifiedResolution(): 
Seq[SessionCatalog.SessionFunctionKind] = {
+    // SPARK-56939: read v1's current database before taking the CM lock; see 
the method
+    // doc for why the resulting staleness is harmless for the kinds list.
+    val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
+    val pathEntries = synchronized {
+      val catName = currentCatalog.name()
+      val effectiveNs: Seq[String] = _currentNamespace.map(_.toSeq).getOrElse {
+        if (catName == SESSION_CATALOG_NAME) {
+          Seq(v1CurrentDb)
+        } else {
+          currentCatalog.defaultNamespace().toSeq
+        }
+      }
+      sqlResolutionPathEntries(catName, effectiveNs)
+    }
+    CatalogManager.systemFunctionKindsFromPath(pathEntries)
+  }
+
   /**
    * True if `system.session` is on the SQL path. Only literal path entries 
can match: the
    * 
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] 
marker expands to
@@ -330,15 +413,41 @@ class CatalogManager(
     
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
   }
 
-  def setCurrentCatalog(catalogName: String): Unit = synchronized {
-    // `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
-    if (currentCatalog.name() != catalogName) {
-      catalog(catalogName)
-      _currentCatalogName = Some(catalogName)
-      _currentNamespace = None
+  def setCurrentCatalog(catalogName: String): Unit = {
+    // SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting 
[[CatalogManager]]'s lock
+    // across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
+    // [[SessionCatalog]]) to prevent a lock-order inversion with concurrent 
unqualified
+    // function resolution.
+    val needsSwitch = synchronized {
+      // `setCurrentCatalog` is noop if it doesn't switch to a different 
catalog.
+      if (currentCatalog.name() != catalogName) {
+        // Force-load the named catalog while holding the manager lock to keep 
the
+        // not-found error semantics; if loading fails, throw before mutating 
state.
+        catalog(catalogName)
+        true
+      } else {
+        false
+      }
+    }
+    if (needsSwitch) {
       // Reset the current database of v1 `SessionCatalog` when switching 
current catalog, so that
       // when we switch back to session catalog, the current namespace 
definitely is ["default"].
+      // Run this BEFORE publishing the new catalog name so that if a reader 
observes the new
+      // catalog, the v1 state is already consistent with it.
+      //
+      // Concurrency trade-off versus the pre-SPARK-56939 atomic version: 
between this v1 write
+      // and the publish below, a concurrent reader of `currentNamespace` sees
+      // `(oldCatalog, v1.currentDb = default)`. When the old catalog is the 
session catalog
+      // (the common case for `USE CATALOG`), the user's previous namespace is 
briefly invisible
+      // to that reader until the new name is published. The opposite torn 
observation
+      // (`newCatalog`, stale `v1.currentDb`) is avoided by this ordering. 
This trade-off
+      // (transient invisibility instead of transient inconsistency, exchanged 
for breaking the
+      // deadlock cycle) is accepted; the long-term post-switch state is the 
same as before.
       v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
+      synchronized {
+        _currentCatalogName = Some(catalogName)
+        _currentNamespace = None
+      }
     }
   }
 
@@ -348,12 +457,20 @@ class CatalogManager(
   }
 
   // Clear all the registered catalogs. Only used in tests.
-  private[sql] def reset(): Unit = synchronized {
-    catalogs.clear()
-    _currentNamespace = None
-    _currentCatalogName = None
-    _sessionPath = None
-    confDefaultPathCache.set(None)
+  //
+  // SPARK-56939: apply the same split-lock pattern as [[setCurrentNamespace]] 
/
+  // [[setCurrentCatalog]] so the locking contract is uniform across every CM 
mutator that
+  // calls back into [[v1SessionCatalog]]. Test-only callers don't race 
against unqualified
+  // function resolution today, but keeping the contract symmetric prevents 
future test
+  // helpers (e.g. session reset in a concurrent harness) from reintroducing 
the cycle.
+  private[sql] def reset(): Unit = {
+    synchronized {
+      catalogs.clear()
+      _currentNamespace = None
+      _currentCatalogName = None
+      _sessionPath = None
+      confDefaultPathCache.set(None)
+    }
     v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index 245398a4694e..238b52ab7cd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql
 
 import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
+import org.apache.spark.sql.connector.catalog.InMemoryCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, LongType}
@@ -977,6 +979,123 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA / USE CATALOG and unqualified 
function lookups " +
+    "do not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // (driven by `USE SCHEMA`) and [[CatalogManager.setCurrentCatalog]] 
(driven by
+    // `USE CATALOG`) both held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabase*]] (which takes the catalog's 
intrinsic lock),
+    // while concurrent unqualified function resolution acquired the catalog's 
intrinsic lock
+    // and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE`-style command raced with any unqualified 
function reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the 
resolution-order setting,
+    // so this test exercises the default configuration. Both 
`setCurrentNamespace` and
+    // `setCurrentCatalog` were rewritten with the same split-lock pattern, so 
the test
+    // exercises both arms symmetrically: one thread toggles `USE SCHEMA`, 
another toggles
+    // `USE CATALOG` between the session catalog and a registered v2 catalog.
+    val v2Catalog = "spark_56939_testcat"
+    spark.conf.set(s"spark.sql.catalog.$v2Catalog", 
classOf[InMemoryCatalog].getName)
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(3)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useSchemaThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            try {
+              sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE 
SCHEMA spark_56939_s2")
+            } catch {
+              // A concurrent `USE` from `useCatalogThread` may switch the 
current catalog
+              // to the v2 testcat, where these schemas don't exist; the 
resulting
+              // SCHEMA_NOT_FOUND is an expected interleaving and is unrelated 
to the
+              // deadlock this test guards against.
+              case _: NoSuchNamespaceException => ()
+            }
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-use-schema")
+
+      val useCatalogThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Toggle between the session catalog and a v2 catalog so each 
iteration
+            // exercises `setCurrentCatalog` -- the arm that previously held 
the manager
+            // lock across `v1SessionCatalog.setCurrentDatabase(default)`. The 
grammar
+            // accepts `USE identifierReference`; a single identifier resolves 
to a
+            // catalog when one is registered under that name.
+            sql(if ((i % 2) == 0) s"USE $v2Catalog" else "USE spark_catalog")
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-use-catalog")
+
+      val lookupThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Unqualified `count(*)` exercises the kinds-order provider that 
resolves
+            // against the live PATH via [[CatalogManager]] -- the side of the 
cycle
+            // that previously acquired the catalog lock first and then the 
manager lock.
+            val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+              .head().getLong(0)
+            assert(n == 3L, s"unexpected count: $n at iteration $i")
+            iterations.incrementAndGet()
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-lookup")
+
+      useSchemaThread.start()
+      useCatalogThread.start()
+      lookupThread.start()
+
+      // Generous join: 30s is plenty for 200 cheap queries per thread and 
gives a
+      // clear failure signal if the implementation regresses into a deadlock.
+      val joinMillis = 30000L
+      useSchemaThread.join(joinMillis)
+      useCatalogThread.join(joinMillis)
+      lookupThread.join(joinMillis)
+
+      assert(!useSchemaThread.isAlive,
+        "USE SCHEMA thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely regressed (SPARK-56939).")
+      assert(!useCatalogThread.isAlive,
+        "USE CATALOG thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely regressed (SPARK-56939).")
+      assert(!lookupThread.isAlive,
+        "Lookup thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely regressed (SPARK-56939).")
+      assert(errors.isEmpty,
+        s"Concurrent lookups raised unexpected errors: 
${errors.toArray.mkString("; ")}")
+      assert(iterations.get() > 0,
+        "Lookup thread never completed a query; suspect contention or 
deadlock.")
+    } finally {
+      sql("USE spark_catalog")
+      sql("USE SCHEMA default")
+      sql("DROP SCHEMA IF EXISTS spark_56939_s1 CASCADE")
+      sql("DROP SCHEMA IF EXISTS spark_56939_s2 CASCADE")
+      spark.conf.unset(s"spark.sql.catalog.$v2Catalog")
+    }
+  }
+
   test("PATH enabled: concurrent SET PATH and unqualified lookups do not 
deadlock") {
     // SessionCatalog.lookupBuiltinOrTempFunction is intentionally NOT
     // synchronized on SessionCatalog because the path-driven kinds provider 
acquires


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to