This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6ecd76f8345b [SPARK-56939][SQL] Resolve deadlock between USE and
function lookup
6ecd76f8345b is described below
commit 6ecd76f8345b3379d952153a46732c223bd21d7f
Author: Serge Rielau <[email protected]>
AuthorDate: Wed May 20 20:48:55 2026 +0800
[SPARK-56939][SQL] Resolve deadlock between USE and function lookup
### What changes were proposed in this pull request?
Break the `SessionCatalog`/`CatalogManager` lock-order inversion that can
deadlock concurrent `USE SCHEMA` / `USE CATALOG` and unqualified function
resolution on the same `SparkSession`.
- `CatalogManager.setCurrentNamespace` / `setCurrentCatalog`: snapshot the
dispatch decision under the manager's intrinsic lock, run the
`v1SessionCatalog` callbacks **outside** the lock, then publish the new state
under the lock again. This stops the "manager lock then catalog lock" arm of
the cycle.
- Add `CatalogManager.sessionFunctionKindsForUnqualifiedResolution()` that
snapshots `(currentCatalog, currentNamespace, sessionPath)` in a single
critical section. The `v1SessionCatalog.getCurrentDatabase` read needed for the
default-namespace fallback is taken **before** the manager lock, so the helper
never re-introduces the deadlock cycle while still avoiding torn-state
observations under racing path updates.
- Route `SessionCatalog.sessionFunctionKindsInResolutionOrder` and
`FunctionResolution.isSessionBeforeBuiltinInPath` through that single helper,
so the lookup loop and the `session-before-builtin` predicate share one
consistent snapshot.
- Tighten the doc comments on the affected methods to document the locking
contract and prevent future regressions.
### Why are the changes needed?
After SPARK-56750 wired `CatalogManager` into `SessionCatalog` as the live
source for path-driven session function kinds, two paths form a lock-order
inversion:
- **Arm 1** (`SessionCatalog.synchronized` ->
`CatalogManager.synchronized`): unqualified function resolution evaluating the
live PATH reaches into `CatalogManager.sqlResolutionPathEntries` (which
synchronizes on the manager) while holding the catalog's intrinsic lock at peer
call sites.
- **Arm 2** (`CatalogManager.synchronized` ->
`SessionCatalog.synchronized`): `setCurrentNamespace` / `setCurrentCatalog`
hold the manager's lock and then call back into
`v1SessionCatalog.setCurrentDatabase*`, which synchronizes on `SessionCatalog`.
Two threads sharing a `SparkSession` -- one running any SQL with an
unqualified function reference, the other running `USE SCHEMA` / `USE CATALOG`
-- can wedge on each other's intrinsic locks. The hazard is independent of
`spark.sql.functionResolution.sessionOrder`: Arm 1 still acquires the manager
lock just to read what the order is, and Arm 2 has nothing to do with order at
all. See [SPARK-56939](https://issues.apache.org/jira/browse/SPARK-56939) for a
standalone repro.
### Does this PR introduce _any_ user-facing change?
No. This is a concurrency fix; serial behavior is unchanged. The only
observable difference under contention is that the session no longer deadlocks.
### How was this patch tested?
- New regression test in `SetPathSuite`, `SPARK-56939: concurrent USE
SCHEMA and unqualified function lookups do not deadlock`, that follows the
JIRA's exact repro: one thread alternates `USE SCHEMA s1` / `USE SCHEMA s2`,
another runs unqualified `count(*)` queries. Without the fix the threads hang
on each other's intrinsic locks; with the fix the test completes within the 30s
budget.
- ``build/sbt 'sql/testOnly *SetPathSuite'`` -- 60/60 pass.
- ``build/sbt 'catalyst/testOnly *SessionCatalogSuite *CatalogManagerSuite
*FunctionResolution*'`` -- 119/119 pass.
- ``build/sbt 'sql/testOnly *SQLFunctionSuite *SQLViewSuite'`` -- 70/70
pass.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor / Claude Opus 4.7
Closes #55977 from srielau/SPARK-56939-use-func-deadlock.
Authored-by: Serge Rielau <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/FunctionResolution.scala | 8 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 27 ++--
.../sql/connector/catalog/CatalogManager.scala | 147 ++++++++++++++++++---
.../scala/org/apache/spark/sql/SetPathSuite.scala | 119 +++++++++++++++++
4 files changed, 273 insertions(+), 28 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index e3dbcc4b6ef7..4f6aee03967c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -83,12 +83,12 @@ class FunctionResolution(
* `count` shadows the builtin) and the `SessionCatalog` security check that
blocks creating
* a temp function with a builtin's name. Reads the live PATH via
`CatalogManager` and
* applies the same kinds extraction that drives `SessionCatalog`'s
fast-path provider, so
- * the predicate stays in sync with the lookup loop's actual order.
+ * the predicate stays in sync with the lookup loop's actual order. Uses the
consolidated
+ * snapshot helper (SPARK-56939) so the (catalog, namespace, path) triple is
observed
+ * atomically.
*/
def isSessionBeforeBuiltinInPath: Boolean = {
- val path = catalogManager.sqlResolutionPathEntries(
- catalogManager.currentCatalog.name(),
catalogManager.currentNamespace.toSeq)
- CatalogManager.systemFunctionKindsFromPath(path).headOption
+ catalogManager.sessionFunctionKindsForUnqualifiedResolution().headOption
.contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 71653eec139b..9e5a2176612c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -154,13 +154,19 @@ class SessionCatalog(
* Session function kinds in resolution order for unqualified lookups: test
override if set,
* else live PATH from [[catalogManagerForSessionFunctionKinds]], else
* [[SQLConf.systemPathOrder]].
+ *
+ * MUST NOT be called while holding [[SessionCatalog]]'s intrinsic lock (see
SPARK-56939):
+ * the path-driven branch delegates to [[CatalogManager]], which has its own
intrinsic lock
+ * and re-enters this catalog through `USE` paths, so nesting the two locks
here would
+ * deadlock.
*/
private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
sessionFunctionKindsTestOverride.getOrElse {
catalogManagerForSessionFunctionKinds match {
case Some(cm) =>
- CatalogManager.systemFunctionKindsFromPath(
- cm.sqlResolutionPathEntries(cm.currentCatalog.name(),
cm.currentNamespace.toSeq))
+ // Use the consolidated helper so unqualified resolution observes a
consistent
+ // (currentCatalog, currentNamespace, path) triple in a single
critical section.
+ cm.sessionFunctionKindsForUnqualifiedResolution()
case None =>
CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
}
@@ -2565,11 +2571,13 @@ class SessionCatalog(
* Resolution order follows the configured path (e.g. builtin then session).
*/
def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] =
{
- // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution
order may call
- // into [[CatalogManager]] (e.g.
[[CatalogManager.sqlResolutionPathEntries]]), which can
- // synchronize on the manager; another
- // thread can hold that lock and call into this catalog (e.g. via
`setCurrentNamespace`),
- // which would deadlock if this method also synchronized on `this`.
+ // Intentionally not `synchronized` on this [[SessionCatalog]]: resolution
order may call
+ // into [[CatalogManager]] (e.g.
[[CatalogManager.sqlResolutionPathEntries]] via
+ // [[sessionFunctionKindsInResolutionOrder]]), which synchronizes on the
manager. The
+ // SPARK-56939 fix removed the reverse `CatalogManager -> SessionCatalog`
nest from the
+ // `USE`-style mutators that previously closed the deadlock cycle; keeping
this method
+ // un-synchronized preserves the `SessionCatalog -> CatalogManager`
direction as the
+ // single allowed ordering, so the invariant survives future regressions.
lookupFunctionWithShadowing(name, tableFunctionRegistry,
checkBuiltinOperators = false)
}
@@ -2724,8 +2732,9 @@ class SessionCatalog(
def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
// Intentionally not `synchronized` on this [[SessionCatalog]] (see
// [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp
resolution uses
- // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must
not run under
- // this catalog's intrinsic lock.
+ // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]], and
SPARK-56939
+ // requires this catalog's intrinsic lock to NEVER be held when reaching
into
+ // [[CatalogManager]] from a function-resolution path.
if (name.database.isEmpty) {
lookupBuiltinOrTempFunction(name.funcName)
.orElse(lookupBuiltinOrTempTableFunction(name.funcName))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 9a39e4ebdd27..3aad52dbd1d0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -132,15 +132,47 @@ class CatalogManager(
}
}
- def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
- if (isSessionCatalog(currentCatalog) && namespace.length == 1) {
+ def setCurrentNamespace(namespace: Array[String]): Unit = {
+ // SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the
callbacks below.
+ // [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly
synchronizes on
+ // [[SessionCatalog]], and concurrent unqualified function resolution
acquires the
+ // [[SessionCatalog]] lock and then reaches into [[CatalogManager]] via
+ // [[sqlResolutionPathEntries]]; nesting the manager lock outside the
catalog lock here
+ // would invert that order and deadlock. Snapshot the dispatch decision
under the lock,
+ // run callbacks outside it, then publish the new namespace under the lock
again.
+ //
+ // Concurrency trade-offs versus the pre-SPARK-56939 atomic version
(v1-side and
+ // CM-side drift modes):
+ //
+ // (a) v1-side drift. The `isSession` snapshot can drift if a concurrent
+ // [[setCurrentCatalog]] switches to a v2 catalog between this read
and the v1
+ // callback below -- the callback would still touch `v1.currentDb`
even though
+ // the active catalog is no longer the session catalog. A later switch
back to
+ // the session catalog resets `v1.currentDb` to `default` (see
+ // [[setCurrentCatalog]]), so long-term state remains consistent; only
the
+ // intermediate observation is novel.
+ //
+ // (b) CM-side publish-overwrite drift (sticky). Between the v1 callback
returning
+ // and the publish below, a concurrent [[setCurrentCatalog]] can
complete fully
+ // -- switching `_currentCatalogName` to (say) a v2 catalog and
clearing
+ // `_currentNamespace = None` -- before this method's publish
overwrites that
+ // with `Some(namespace)`. End state: `_currentNamespace =
Some(namespace)` is
+ // published under a different `_currentCatalogName` than the one
observed when
+ // [[isSession]] was snapshotted at the top. Unlike (a) there is no
analogous
+ // auto-recovery; the mismatch sticks until the next `USE`. This is
still
+ // last-writer-wins for two racing `USE` commands, which is the
conventional
+ // expectation, so it is accepted as a trade-off against the deadlock
alternative.
+ val isSession = synchronized(isSessionCatalog(currentCatalog))
+ if (isSession && namespace.length == 1) {
v1SessionCatalog.setCurrentDatabaseWithNameCheck(
namespace.head,
_ => assertNamespaceExist(namespace))
} else {
assertNamespaceExist(namespace)
}
- _currentNamespace = Some(namespace)
+ synchronized {
+ _currentNamespace = Some(namespace)
+ }
}
import CatalogManager.SessionPathEntry
@@ -221,6 +253,15 @@ class CatalogManager(
* When PATH is enabled and a session path is in effect (stored or via
* [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls
back to the legacy
* resolutionSearchPath.
+ *
+ * SPARK-56939 note: this is currently the only intentional
`CatalogManager.synchronized ->
+ * SessionCatalog.synchronized` nest left in this class. The transitive call
into
+ * [[v1SessionCatalog.getCurrentDatabase]] happens via [[currentNamespace]],
which fetches
+ * the v1 current database under the CM lock. It is safe today because no
code path holds
+ * [[SessionCatalog]]'s intrinsic lock while waiting on [[CatalogManager]]'s
-- the
+ * SPARK-56939 fix removed every such SC->CM ordering. Any future change
that introduces a
+ * new SC->CM ordering must take `currentPathString` (or any other CM->SC
nest) into
+ * account to avoid resurrecting the deadlock.
*/
def currentPathString: String = synchronized {
import CatalogV2Implicits._
@@ -265,6 +306,48 @@ class CatalogManager(
currentCatalog, currentNamespace,
currentCatalog, currentNamespace)
+ /**
+ * Snapshot the live PATH-derived [[SessionCatalog.SessionFunctionKind]]
order used by
+ * unqualified function/table-function resolution.
+ *
+ * The `(currentCatalog, _currentNamespace, sessionPath)` triple is read
together inside a
+ * single CM critical section so a concurrent `USE` / `SET PATH` cannot
return a torn
+ * snapshot for those three fields (e.g. catalog from one observation,
explicit namespace
+ * from another).
+ *
+ * The `v1SessionCatalog.getCurrentDatabase` read needed for the
default-namespace fallback
+ * is taken OUTSIDE the CM lock and is therefore intentionally racy w.r.t. a
concurrent
+ * `USE SCHEMA`. That staleness is harmless for this helper's output: this
method consumes
+ * `effectiveNs` only to expand `CURRENT_SCHEMA` markers in the SQL path, and
+ * [[CatalogManager.systemFunctionKindsFromPath]] only retains literal
`system.builtin` /
+ * `system.session` entries from the resolved path -- it never inspects any
+ * `(catalog, namespace)` derived from `v1`. So if `v1CurrentDb` lags by one
`USE SCHEMA`,
+ * a `CURRENT_SCHEMA` entry might briefly resolve to the previous database,
but the kinds
+ * list (the only thing returned here) is unaffected. Moving the read inside
the CM lock
+ * would re-introduce the SPARK-56939 lock-order inversion this helper
exists to avoid.
+ *
+ * Callers (e.g. [[SessionCatalog.sessionFunctionKindsInResolutionOrder]],
+ *
[[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
+ * MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this
method.
+ */
+ def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionCatalog.SessionFunctionKind] = {
+ // SPARK-56939: read v1's current database before taking the CM lock; see
the method
+ // doc for why the resulting staleness is harmless for the kinds list.
+ val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
+ val pathEntries = synchronized {
+ val catName = currentCatalog.name()
+ val effectiveNs: Seq[String] = _currentNamespace.map(_.toSeq).getOrElse {
+ if (catName == SESSION_CATALOG_NAME) {
+ Seq(v1CurrentDb)
+ } else {
+ currentCatalog.defaultNamespace().toSeq
+ }
+ }
+ sqlResolutionPathEntries(catName, effectiveNs)
+ }
+ CatalogManager.systemFunctionKindsFromPath(pathEntries)
+ }
+
/**
* True if `system.session` is on the SQL path. Only literal path entries
can match: the
*
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]]
marker expands to
@@ -330,15 +413,41 @@ class CatalogManager(
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
}
- def setCurrentCatalog(catalogName: String): Unit = synchronized {
- // `setCurrentCatalog` is noop if it doesn't switch to a different catalog.
- if (currentCatalog.name() != catalogName) {
- catalog(catalogName)
- _currentCatalogName = Some(catalogName)
- _currentNamespace = None
+ def setCurrentCatalog(catalogName: String): Unit = {
+ // SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting
[[CatalogManager]]'s lock
+ // across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
+ // [[SessionCatalog]]) to prevent a lock-order inversion with concurrent
unqualified
+ // function resolution.
+ val needsSwitch = synchronized {
+ // `setCurrentCatalog` is noop if it doesn't switch to a different
catalog.
+ if (currentCatalog.name() != catalogName) {
+ // Force-load the named catalog while holding the manager lock to keep
the
+ // not-found error semantics; if loading fails, throw before mutating
state.
+ catalog(catalogName)
+ true
+ } else {
+ false
+ }
+ }
+ if (needsSwitch) {
// Reset the current database of v1 `SessionCatalog` when switching
current catalog, so that
// when we switch back to session catalog, the current namespace
definitely is ["default"].
+ // Run this BEFORE publishing the new catalog name so that if a reader
observes the new
+ // catalog, the v1 state is already consistent with it.
+ //
+ // Concurrency trade-off versus the pre-SPARK-56939 atomic version:
between this v1 write
+ // and the publish below, a concurrent reader of `currentNamespace` sees
+ // `(oldCatalog, v1.currentDb = default)`. When the old catalog is the
session catalog
+ // (the common case for `USE CATALOG`), the user's previous namespace is
briefly invisible
+ // to that reader until the new name is published. The opposite torn
observation
+ // (`newCatalog`, stale `v1.currentDb`) is avoided by this ordering.
This trade-off
+ // (transient invisibility instead of transient inconsistency, exchanged
for breaking the
+ // deadlock cycle) is accepted; the long-term post-switch state is the
same as before.
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
+ synchronized {
+ _currentCatalogName = Some(catalogName)
+ _currentNamespace = None
+ }
}
}
@@ -348,12 +457,20 @@ class CatalogManager(
}
// Clear all the registered catalogs. Only used in tests.
- private[sql] def reset(): Unit = synchronized {
- catalogs.clear()
- _currentNamespace = None
- _currentCatalogName = None
- _sessionPath = None
- confDefaultPathCache.set(None)
+ //
+ // SPARK-56939: apply the same split-lock pattern as [[setCurrentNamespace]]
/
+ // [[setCurrentCatalog]] so the locking contract is uniform across every CM
mutator that
+ // calls back into [[v1SessionCatalog]]. Test-only callers don't race
against unqualified
+ // function resolution today, but keeping the contract symmetric prevents
future test
+ // helpers (e.g. session reset in a concurrent harness) from reintroducing
the cycle.
+ private[sql] def reset(): Unit = {
+ synchronized {
+ catalogs.clear()
+ _currentNamespace = None
+ _currentCatalogName = None
+ _sessionPath = None
+ confDefaultPathCache.set(None)
+ }
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index 245398a4694e..238b52ab7cd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql
import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
+import org.apache.spark.sql.connector.catalog.InMemoryCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType}
@@ -977,6 +979,123 @@ class SetPathSuite extends SharedSparkSession {
}
}
+ test("SPARK-56939: concurrent USE SCHEMA / USE CATALOG and unqualified
function lookups " +
+ "do not deadlock") {
+ // Regression for SPARK-56939. Prior to the fix,
[[CatalogManager.setCurrentNamespace]]
+ // (driven by `USE SCHEMA`) and [[CatalogManager.setCurrentCatalog]]
(driven by
+ // `USE CATALOG`) both held the manager's intrinsic lock while calling into
+ // [[SessionCatalog.setCurrentDatabase*]] (which takes the catalog's
intrinsic lock),
+ // while concurrent unqualified function resolution acquired the catalog's
intrinsic lock
+ // and then reached back into the manager via
+ // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion
deadlocked the
+ // session whenever a `USE`-style command raced with any unqualified
function reference.
+ //
+ // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
resolution-order setting,
+ // so this test exercises the default configuration. Both
`setCurrentNamespace` and
+ // `setCurrentCatalog` were rewritten with the same split-lock pattern, so
the test
+ // exercises both arms symmetrically: one thread toggles `USE SCHEMA`,
another toggles
+ // `USE CATALOG` between the session catalog and a registered v2 catalog.
+ val v2Catalog = "spark_56939_testcat"
+ spark.conf.set(s"spark.sql.catalog.$v2Catalog",
classOf[InMemoryCatalog].getName)
+ sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+ sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+ try {
+ val budget = 200
+ val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+ val barrier = new java.util.concurrent.CyclicBarrier(3)
+ val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+ val useSchemaThread = new Thread(() => {
+ try {
+ barrier.await()
+ var i = 0
+ while (i < budget && errors.isEmpty) {
+ try {
+ sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE
SCHEMA spark_56939_s2")
+ } catch {
+ // A concurrent `USE` from `useCatalogThread` may switch the
current catalog
+ // to the v2 testcat, where these schemas don't exist; the
resulting
+ // SCHEMA_NOT_FOUND is an expected interleaving and is unrelated
to the
+ // deadlock this test guards against.
+ case _: NoSuchNamespaceException => ()
+ }
+ i += 1
+ }
+ } catch {
+ case t: Throwable => errors.add(t)
+ }
+ }, "SPARK-56939-use-schema")
+
+ val useCatalogThread = new Thread(() => {
+ try {
+ barrier.await()
+ var i = 0
+ while (i < budget && errors.isEmpty) {
+ // Toggle between the session catalog and a v2 catalog so each
iteration
+ // exercises `setCurrentCatalog` -- the arm that previously held
the manager
+ // lock across `v1SessionCatalog.setCurrentDatabase(default)`. The
grammar
+ // accepts `USE identifierReference`; a single identifier resolves
to a
+ // catalog when one is registered under that name.
+ sql(if ((i % 2) == 0) s"USE $v2Catalog" else "USE spark_catalog")
+ i += 1
+ }
+ } catch {
+ case t: Throwable => errors.add(t)
+ }
+ }, "SPARK-56939-use-catalog")
+
+ val lookupThread = new Thread(() => {
+ try {
+ barrier.await()
+ var i = 0
+ while (i < budget && errors.isEmpty) {
+ // Unqualified `count(*)` exercises the kinds-order provider that
resolves
+ // against the live PATH via [[CatalogManager]] -- the side of the
cycle
+ // that previously acquired the catalog lock first and then the
manager lock.
+ val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+ .head().getLong(0)
+ assert(n == 3L, s"unexpected count: $n at iteration $i")
+ iterations.incrementAndGet()
+ i += 1
+ }
+ } catch {
+ case t: Throwable => errors.add(t)
+ }
+ }, "SPARK-56939-lookup")
+
+ useSchemaThread.start()
+ useCatalogThread.start()
+ lookupThread.start()
+
+ // Generous join: 30s is plenty for 200 cheap queries per thread and
gives a
+ // clear failure signal if the implementation regresses into a deadlock.
+ val joinMillis = 30000L
+ useSchemaThread.join(joinMillis)
+ useCatalogThread.join(joinMillis)
+ lookupThread.join(joinMillis)
+
+ assert(!useSchemaThread.isAlive,
+ "USE SCHEMA thread did not finish; lock-order inversion between
SessionCatalog and " +
+ "CatalogManager likely regressed (SPARK-56939).")
+ assert(!useCatalogThread.isAlive,
+ "USE CATALOG thread did not finish; lock-order inversion between
SessionCatalog and " +
+ "CatalogManager likely regressed (SPARK-56939).")
+ assert(!lookupThread.isAlive,
+ "Lookup thread did not finish; lock-order inversion between
SessionCatalog and " +
+ "CatalogManager likely regressed (SPARK-56939).")
+ assert(errors.isEmpty,
+ s"Concurrent lookups raised unexpected errors:
${errors.toArray.mkString("; ")}")
+ assert(iterations.get() > 0,
+ "Lookup thread never completed a query; suspect contention or
deadlock.")
+ } finally {
+ sql("USE spark_catalog")
+ sql("USE SCHEMA default")
+ sql("DROP SCHEMA IF EXISTS spark_56939_s1 CASCADE")
+ sql("DROP SCHEMA IF EXISTS spark_56939_s2 CASCADE")
+ spark.conf.unset(s"spark.sql.catalog.$v2Catalog")
+ }
+ }
+
test("PATH enabled: concurrent SET PATH and unqualified lookups do not
deadlock") {
// SessionCatalog.lookupBuiltinOrTempFunction is intentionally NOT
// synchronized on SessionCatalog because the path-driven kinds provider
acquires
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]