Serge Rielau created SPARK-56939:
------------------------------------
Summary: Resolve deadlock between USE and function lookup
Key: SPARK-56939
URL: https://issues.apache.org/jira/browse/SPARK-56939
Project: Spark
Issue Type: Sub-task
Components: Spark Core
Affects Versions: 4.2.0
Reporter: Serge Rielau
h1. Summary
{{{}SessionCatalog{}}}/{{{}CatalogManager{}}} lock-order inversion can deadlock
concurrent {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} and unqualified function
resolution
Issue Type: Bug Component(s): SQL Affects Version/s: 4.2.0 (introduced by
SPARK-56750) Priority: Major Labels: correctness, concurrency
----
h1. Description
After SPARK-56750 wired {{CatalogManager}} into {{SessionCatalog}} as the live
source for the path-driven session function kinds, two
{{{}SessionCatalog{}}}/{{{}CatalogManager{}}} code paths form a lock-order
inversion that can deadlock when both run on different threads of the same
{{{}SparkSession{}}}.
h2. The cycle
Arm 1: {{SessionCatalog.synchronized}} → {{CatalogManager.synchronized}}
{{SessionCatalog.lookupFunctionInfo}} (still {{synchronized}} on the catalog)
runs unqualified function resolution, which evaluates
{{{}sessionFunctionKindsInResolutionOrder{}}}. With the live CatalogManager
bound (production), that derivation needs the catalog manager's lock to read
{{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the session path:
private def sessionFunctionKindsInResolutionOrder:
Seq[SessionCatalog.SessionFunctionKind] =
sessionFunctionKindsTestOverride.getOrElse {
catalogManagerForSessionFunctionKinds match {
caseSome(cm) =>
CatalogManager.systemFunctionKindsFromPath(
cm.sqlResolutionPathEntries(cm.currentCatalog.name(),
cm.currentNamespace.toSeq))
caseNone=>
CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
}
}
So: lookup acquires the {{SessionCatalog}} intrinsic lock, then needs the
{{CatalogManager}} intrinsic lock.
Arm 2: {{CatalogManager.synchronized}} → {{SessionCatalog.synchronized}}
{{{}CatalogManager.setCurrentNamespace{}}}, {{{}setCurrentCatalog{}}},
{{{}setCurrentCatalogAndNamespace{}}}, and {{setCurrentCatalogWithoutCheck}}
all hold {{CatalogManager.synchronized}} and then call back into
{{{}v1SessionCatalog.setCurrentDatabase*{}}}, which re-acquires
{{{}SessionCatalog{}}}:
def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
if (isSessionCatalog(currentCatalog) && namespace.length ==1) {
v1SessionCatalog.setCurrentDatabaseWithNameCheck(
namespace.head,
_ => assertNamespaceExist(namespace))
}else{
assertNamespaceExist(namespace)
}
_currentNamespace = Some(namespace)
}
So: {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} acquire the {{CatalogManager}} lock,
then need the {{SessionCatalog}} lock.
h2. Triggering conditions
The deadlock requires two threads sharing one {{SparkSession}} such that:
# One thread is executing any SQL that contains an unqualified function
reference (extremely common) so it runs {{{}lookupFunctionInfo{}}}.
# The other thread is executing {{{}USE SCHEMA …{}}}, {{{}USE CATALOG …{}}},
or {{USE catalog.schema}} so it runs the matching
{{{}setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}} path.
Both can also be the _same_ thread interleaved across futures if any rule
invoked from inside an analyzer scope triggers a {{{}USE{}}}-equivalent.
h2. Independence from {{spark.sql.functionResolution.sessionOrder}}
The hazard is independent of the function-resolution order setting
({{{}first{}}} / {{second}} / {{{}last{}}}):
* Arm 1 acquires {{CatalogManager.synchronized}} to {_}read what the order
is{_}. Whatever the order value, the lock is taken.
* Arm 2 is a property of {{{}USE{}}}-style state mutation and has nothing to
do with order at all.
h2. Why this hasn't been caught earlier
The in-source comment on {{SessionCatalog.lookupBuiltinOrTempFunction}} is
explicit about Arm 1 — it is intentionally _not_ {{synchronized}} to avoid this
very cycle. That partial mitigation only covers
{{{}lookupBuiltinOrTempFunction{}}}; {{{}lookupFunctionInfo{}}},
{{{}lookupFunctionInfoByIdentifier{}}}, and other {{synchronized}} callers that
also fall into
{{{}lookupBuiltinOrTempFunction{}}}/{{{}lookupPersistentFunction{}}} retain the
hazard.
The concurrent-{{{}SET PATH{}}} smoke test in {{SetPathSuite}} added by
SPARK-56853 surfaces the contention version of this ({{{}SET PATH{}}} doesn't
itself call back into {{{}v1SessionCatalog{}}}, so it produces lock contention
but no true deadlock — the test fails by {_}timeout under load{_}, not by a
hang). A targeted test with {{USE SCHEMA}} on the setter thread reliably
reproduces the full hang.
h2. Proposed fix
Two minimal changes:
# Drop {{synchronized}} on {{SessionCatalog.lookupFunctionInfo}} (and any
peers that share the hazard). Document that unqualified resolution must not
hold the catalog's intrinsic lock while consulting the bound
{{{}CatalogManager{}}}. This kills Arm 1.
# In
{{{}CatalogManager.setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}}/{{{}setCurrentCatalogAndNamespace{}}}/{{{}setCurrentCatalogWithoutCheck{}}},
narrow the {{synchronized}} region so the v1-catalog callbacks run _outside_
the manager's intrinsic lock. The state read needed for the decision can be
snapshotted under the lock, the callback run outside it, and the
namespace/catalog write done under the lock. This kills Arm 2.
Either change alone breaks the cycle; both make the locking discipline robust
against future regressions.
For the path-derived kinds, introduce a
{{CatalogManager.sessionFunctionKindsForUnqualifiedResolution}} helper that
snapshots {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the path
entries together under a single critical section; then {{SessionCatalog}} and
{{FunctionResolution.isSessionBeforeBuiltinInPath}} consult that one helper
instead of taking two separate reads, eliminating torn-state observations under
racing path updates.
h2. Repro
A standalone unit suite using {{SparkSession}} (no DBR-specific catalog
wrappers needed):
val session = SparkSession.builder().master("local[*]").getOrCreate()
val barrier = new CyclicBarrier(2)
val errors = new ConcurrentLinkedQueue[Throwable]()
val lookupThread = new Thread(() => {
try{
barrier.await()
for (_ <-0 until 200) {
session.sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)").head()
}
}catch\{caset: Throwable=> errors.add(t) }
}, "lookup")
val useThread = new Thread(() => {
try{
session.sql("CREATE SCHEMA IF NOT EXISTS s1")
session.sql("CREATE SCHEMA IF NOT EXISTS s2")
barrier.await()
for (i <-0 until 200) {
session.sql(if ((i % 2) == 0) "USE SCHEMA s1" else "USE SCHEMA s2")
}
}catch\{caset: Throwable=> errors.add(t) }
}, "use-schema")
lookupThread.start(); useThread.start()
lookupThread.join(30000); useThread.join(30000)
require(!lookupThread.isAlive && !useThread.isAlive,
"deadlock detected: SessionCatalog/CatalogManager lock-order inversion")
Under contention this hangs both threads in {{Object.wait}} on each other's
intrinsic locks; a thread dump shows the cycle directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]